Моя задача проста: мне нужно запросить скользящий индекс elasticsearch реактивным способом.
Поскольку @Document не поддерживает имя индекса с помощью Spring EL, например @Document(index = "indexName-#(new Date().format(yyyy-MM-dd))")
Итак, я пытаюсь вызвать elasticsearch с помощью ReactiveElasticsearchTemplate, который поддерживает изменение имени индекса во время выполнения.
Но поскольку объем данных превышает 10 000, мне нужно использовать прокрутку, чтобы повторять запрос, пока мы не получим все данные.
Я завершил первый запрос и запрос прокрутки, и он может вернуть значение.
Но мне нужно объединить все результаты и затем вернуться.
Как мне это сделать? На данный момент, когда потребитель все еще работает, пустой результат возвращается во внешний интерфейс. Как я могу попросить поток подождать, пока потребитель завершит поиск elasticsearch и вернет все данные? Спасибо.
public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to)
throws Exception {
List<ELKModel> result = new ArrayList<ELKModel>();
List<Long> total = new ArrayList<>();
List<Long> currentSize = new ArrayList<>();
List<String> scrollId = new ArrayList<>();
NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
sourceBuilder.withQuery(
QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
NativeSearchQuery query = sourceBuilder.build();
elasticsearchSupport
.scrollStart(query, ELKModel.class)
.map(ELKModelWrapper::valueFrom).
subscribe(
wrapper -> {
total.add(wrapper.getTotal());l
currentSize.add(wrapper.getCurrentSize());
result.addAll(wrapper.getResults());
scrollId.add(wrapper.getScrollId());
}).dispose();
while (currentSize.size() == 1 && total.size() == 1 && currentSize.get(0) < total.get(0)) {
elasticsearchSupport
.scrollContinue(scrollId.get(0), ELKModel.class)
.map(ELKModelWrapper::valueFrom)
.subscribe(
wrapper -> {
currentSize.add(0, currentSize.get(0) + wrapper.getCurrentSize());
result.addAll(wrapper.getResults());
scrollId.add(0, wrapper.getScrollId());
}).dispose();
}
return Flux.fromIterable(result);
}




Поскольку вы используете реактивный подход с Flux, вам не следует блокировать с помощью оператора while. Вместо этого вам следует объединить свои реактивные операции в цепочку.
expand используется для выполнения повторных запросов до тех пор, пока мы не соберем все необходимые данные, применяя операцию прокруткиПродолжить только в том случае, если еще остались данные согласно сравнению общего и текущего размера.
Некоторая модификация вашего кода, но не обязательно работоспособная.
public Flux<ELKModel> getByTradeDateBetween(LocalDateTime from, LocalDateTime to) {
NativeSearchQueryBuilder sourceBuilder = new NativeSearchQueryBuilder();
sourceBuilder.withQuery(
QueryBuilders.boolQuery().must(QueryBuilders.rangeQuery(TRADE_DATE).gte(from).lte(to)));
sourceBuilder.withPageable(PageRequest.of(0, SINGLE_QUERY_SIZE));
NativeSearchQuery query = sourceBuilder.build();
return elasticsearchSupport.scrollStart(query, ELKModel.class)
.expand(wrapper -> {
if (wrapper.getTotal() > wrapper.getCurrentSize()) {
return elasticsearchSupport.scrollContinue(wrapper.getScrollId(), ELKModel.class)
.map(ELKModelWrapper::valueFrom);
} else {
return Flux.empty();
}
})
.flatMap(wrapper -> Flux.fromIterable(wrapper.getResults()));
}
Вы должны использовать довольно устаревшую версию Spring Data Elasticsearch. Поддержка SpEL для индексных имен в аннотации @Document была введена 4 года назад, вы можете увидеть, как это можно использовать, в моем посте https://www.sothawo.com/2020/07/how-to-provide-a -динамическое-имя-индекса-в-весне-данных-elasticsearch-using-spel/.
Что касается реактивности: вы никогда не должны блокировать поток в реактивном коде. И реактивный код выполняет эту прокрутку под капотом, я не понимаю, почему вы хотите сделать это самостоятельно.
очень ценю ваш ответ и ваш блог, на самом деле я уже пробовал SpEL раньше, но, возможно, это не удалось из-за грамматической ошибки SpEL, и это заставило меня выбрать сложный путь.
Большое спасибо за ваш совет, похоже, мне нужно заняться самостоятельным изучением реактивной реакции. :П