При попытке обработать данные, полученные из WebClient в случайный момент времени, я получаю ошибку «Соединение преждевременно закрыто ДО ответа». Страницы содержат довольно много данных, которые впоследствии обрабатываются (в основном обновления и вставки БД). Через случайное время я получаю сообщение об ошибке:
**Reactor Netty: соединение преждевременно закрыто ДО ответа веб-клиента.**
Рассматриваемый метод заключается в следующем:
private Mono<String> migrateSomeData() {
return getAllPages()
.flatMapIterable(Page::getItems)
.filter(this::isValidItem)
.doOnNext(item -> doSomeLogging())
.filter(item-> this.checkIfAlreadyProcessed(item))
.flatMap(this::mapToDto)
.flatMap(this::persist)
.doOnNext(this::saveAsProcessed)
.collectList()
.map(this::getTotalAmountOfProcessed);
}
В коде были опущены некоторые части, связанные с пивоварением. getAllPages использует метод Flux .expand для рекурсивного вызова следующих страниц. Это проверено и работает корректно. Еще одна важная вещь, которую следует отметить: все операции с базой данных блокируются, поскольку мы не реализовали реактивный jdbc.
Я попытался обновить конфигурацию веб-клиента:
@Bean
public WebClient someClient(
ReactiveOAuth2AuthorizedClientManager clientAuthorizedClientManager) {
ServerOAuth2AuthorizedClientExchangeFilterFunction oauth =
new ServerOAuth2AuthorizedClientExchangeFilterFunction(clientAuthorizedClientManager);
oauth.setDefaultClientRegistrationId("clientId");
final int size = 16 * 1024 * 1024;
final ExchangeStrategies strategies = buildClientWithExtendedResponseSize(size);
return WebClient.builder()
.defaultHeader("Accept", MediaType.APPLICATION_JSON_VALUE)
.defaultHeader("subscription-key", someKeyValue)
.clientConnector(createWiretappedClientHttpConnector(this.getClass()))
.filter(oauth)
.exchangeStrategies(strategies)
.build();
}
private static ExchangeStrategies buildClientWithExtendedResponseSize(int size) {
return ExchangeStrategies.builder()
.codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(size))
.build();
}
private ClientHttpConnector createWiretappedClientHttpConnector(Class<?> invokedClass) {
HttpClient httpClient =
HttpClient.create()
.option(ChannelOption.SO_KEEPALIVE, true)
.responseTimeout(Duration.ofMinutes(5))
.doOnConnected(
conn ->
conn.addHandlerLast(new ReadTimeoutHandler(5 * 60))
.addHandlerLast(new WriteTimeoutHandler(5 * 60)))
.compress(true)
.wiretap(
invokedClass.getCanonicalName(), LogLevel.TRACE, AdvancedByteBufFormat.TEXTUAL);
return new ReactorClientHttpConnector(httpClient);
}
Я также постарался максимально сузить этот код, поэтому простите меня за структурные ошибки.
Я протестировал это на локальном имитируемом экземпляре API в Wiremock, и проблема все еще сохраняется, так что это должно исключить проблемы с конфигурацией сервера.
Возможно ли, что мой код обработки работает слишком медленно и что ответ, возвращенный веб-клиентами, не используется вовремя, и веб-клиент решает закрыть соединение?




Извините за задержку с ответом. На странице GitHub реактора есть вопрос по этому поводу. Эта проблема выглядит примерно так: «клиент отправляет [ACK] и не отправляет [ACK,FIN] и сохраняет соединение открытым. Таким образом, соединение не закрывается клиентом, а затем используется повторно, что приводит к этой ошибке».
Решением должна быть установка максимального времени простоя (максимальное время, в течение которого это соединение остается бездействующим в пуле соединений). Предлагаемое значение — это значение, присвоенное клиентскому серверу keepAliveTimeout, но если это сбивает с толку, попробуйте что-нибудь маленькое и поиграйте. Итак, решение должно выглядеть так:
ConnectionProvider connectionProviderWithMaxIdleTime = ConnectionProvider.builder("withMaxIdleTime")
.maxIdleTime(Duration.ofSeconds(120)) //if issue persists reduce this
.build();
HttpClient httpClient = HttpClient.create(connectionProviderWithMaxIdleTime)
.option(ChannelOption.SO_KEEPALIVE, true)
{...the rest of your code...}
Надеюсь это поможет!
Я думаю, это связано с вашим вариантом использования. Насколько я понимаю, если клиент устанавливает соединение с сервером, делает запрос, последний сигнал не является FIN, поэтому соединение остается открытым для большего количества запросов от одного и того же клиента до тех пор, пока не будет достигнута настройка времени простоя (или функция KeepAlive). значение с сервера). Поэтому, если вы ожидаете большего количества запросов от этого клиента на тот же сервер, возможно, вам нужно, например, более высокое значение. Чтобы избежать затрат на повторное подключение к серверу. Если этот клиент делает только 1 запрос, вы можете снизить его. Однако в таких изменениях я всегда запускал и тесты производительности...
После проверки поведения сообщите нам об этом в комментариях для целей документирования. И если это именно то решение, которое вы искали, отметьте его как решенное, еще раз для дальнейшего использования! Спасибо
Клиент много раз связывается с одним и тем же сервером, поскольку это ответ с разбивкой на страницы, и я обрабатываю все доступные данные. Но это значение, похоже, работает, и производительность, насколько я вижу, не пострадала. Никаких ошибок сегодня не было, так что это здорово, с этим возникли некоторые проблемы.
Вы также можете проверить projectreactor.io/docs/netty/release/reference/…
@stelios.anastasakis После 5 дней нормальной работы обработки данных сегодня утром я снова столкнулся с ошибкой. Это определенно уменьшило количество случаев возникновения ошибок, но, похоже, не было полностью исправлено.
может нужно еще немного настроить? Например, играйте со значением, которое вы используете. Также попробуйте отладку с помощью руководств, которыми поделилась @VioletaGeorgieva. Могут быть и другие области, вызвавшие этот сбой, например, какая-то конфигурация целевого сервера. Если за много дней произошла одна ошибка, возможно, это что-то очень маленькое и специфическое.
@stelios.anastasakis Я попросил установить на удаленном компьютере такие инструменты, как tcpdump, для устранения проблемы. Это действительно кажется очень конкретным, поскольку проблемы не возникало с тех пор, как я в последний раз оставлял здесь комментарий. Это все еще достаточно хорошо для текущего варианта использования, и я посмотрю, что произойдет дальше, когда я смогу его отладить. Я обновлю сообщение дополнительной информацией, как только она у меня появится, поскольку она может быть полезна тем, кто еще сталкивается с этой проблемой.
Кажется, это сработало! Однако мне пришлось установить значение 15 секунд для времени простоя. Кажется, это исправило проблему, но мне понадобится несколько дней, чтобы проверить поведение. Является ли такое низкое значение нормальным?