Reactor Netty: соединение преждевременно закрыто ДО ответа веб-клиента

При попытке обработать данные, полученные из WebClient в случайный момент времени, я получаю ошибку «Соединение преждевременно закрыто ДО ответа». Страницы содержат довольно много данных, которые впоследствии обрабатываются (в основном обновления и вставки БД). Через случайное время я получаю сообщение об ошибке:

**Reactor Netty: соединение преждевременно закрыто ДО ответа веб-клиента.**

Рассматриваемый метод заключается в следующем:

  private Mono<String> migrateSomeData() {
    return getAllPages()
        .flatMapIterable(Page::getItems)
        .filter(this::isValidItem)
        .doOnNext(item -> doSomeLogging())
        .filter(item-> this.checkIfAlreadyProcessed(item))
        .flatMap(this::mapToDto)
        .flatMap(this::persist)
        .doOnNext(this::saveAsProcessed)
        .collectList()
        .map(this::getTotalAmountOfProcessed);
  }

В коде были опущены некоторые части, связанные с пивоварением. getAllPages использует метод Flux .expand для рекурсивного вызова следующих страниц. Это проверено и работает корректно. Еще одна важная вещь, которую следует отметить: все операции с базой данных блокируются, поскольку мы не реализовали реактивный jdbc.

Я попытался обновить конфигурацию веб-клиента:

@Bean
  public WebClient someClient(
      ReactiveOAuth2AuthorizedClientManager clientAuthorizedClientManager) {

    ServerOAuth2AuthorizedClientExchangeFilterFunction oauth =
        new ServerOAuth2AuthorizedClientExchangeFilterFunction(clientAuthorizedClientManager);
    oauth.setDefaultClientRegistrationId("clientId");

    final int size = 16 * 1024 * 1024;
    final ExchangeStrategies strategies = buildClientWithExtendedResponseSize(size);

    return WebClient.builder()
        .defaultHeader("Accept", MediaType.APPLICATION_JSON_VALUE)
        .defaultHeader("subscription-key", someKeyValue)
        .clientConnector(createWiretappedClientHttpConnector(this.getClass()))
        .filter(oauth)
        .exchangeStrategies(strategies)
        .build();
  }

  private static ExchangeStrategies buildClientWithExtendedResponseSize(int size) {
    return ExchangeStrategies.builder()
        .codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(size))
        .build();
  }

private ClientHttpConnector createWiretappedClientHttpConnector(Class<?> invokedClass) {
    HttpClient httpClient =
        HttpClient.create()
            .option(ChannelOption.SO_KEEPALIVE, true)
            .responseTimeout(Duration.ofMinutes(5))
            .doOnConnected(
                conn ->
                    conn.addHandlerLast(new ReadTimeoutHandler(5 * 60))
                        .addHandlerLast(new WriteTimeoutHandler(5 * 60)))
            .compress(true)
            .wiretap(
                invokedClass.getCanonicalName(), LogLevel.TRACE, AdvancedByteBufFormat.TEXTUAL);
    return new ReactorClientHttpConnector(httpClient);
  }

Я также постарался максимально сузить этот код, поэтому простите меня за структурные ошибки.

Я протестировал это на локальном имитируемом экземпляре API в Wiremock, и проблема все еще сохраняется, так что это должно исключить проблемы с конфигурацией сервера.

Возможно ли, что мой код обработки работает слишком медленно и что ответ, возвращенный веб-клиентами, не используется вовремя, и веб-клиент решает закрыть соединение?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
0
647
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Извините за задержку с ответом. На странице GitHub реактора есть вопрос по этому поводу. Эта проблема выглядит примерно так: «клиент отправляет [ACK] и не отправляет [ACK,FIN] и сохраняет соединение открытым. Таким образом, соединение не закрывается клиентом, а затем используется повторно, что приводит к этой ошибке».

Решением должна быть установка максимального времени простоя (максимальное время, в течение которого это соединение остается бездействующим в пуле соединений). Предлагаемое значение — это значение, присвоенное клиентскому серверу keepAliveTimeout, но если это сбивает с толку, попробуйте что-нибудь маленькое и поиграйте. Итак, решение должно выглядеть так:

ConnectionProvider connectionProviderWithMaxIdleTime = ConnectionProvider.builder("withMaxIdleTime")
                .maxIdleTime(Duration.ofSeconds(120)) //if issue persists reduce this
                .build();
HttpClient httpClient = HttpClient.create(connectionProviderWithMaxIdleTime)
                .option(ChannelOption.SO_KEEPALIVE, true)
                {...the rest of your code...}

Надеюсь это поможет!

Кажется, это сработало! Однако мне пришлось установить значение 15 секунд для времени простоя. Кажется, это исправило проблему, но мне понадобится несколько дней, чтобы проверить поведение. Является ли такое низкое значение нормальным?

benjaminv2 27.04.2024 11:51

Я думаю, это связано с вашим вариантом использования. Насколько я понимаю, если клиент устанавливает соединение с сервером, делает запрос, последний сигнал не является FIN, поэтому соединение остается открытым для большего количества запросов от одного и того же клиента до тех пор, пока не будет достигнута настройка времени простоя (или функция KeepAlive). значение с сервера). Поэтому, если вы ожидаете большего количества запросов от этого клиента на тот же сервер, возможно, вам нужно, например, более высокое значение. Чтобы избежать затрат на повторное подключение к серверу. Если этот клиент делает только 1 запрос, вы можете снизить его. Однако в таких изменениях я всегда запускал и тесты производительности...

stelios.anastasakis 27.04.2024 14:08

После проверки поведения сообщите нам об этом в комментариях для целей документирования. И если это именно то решение, которое вы искали, отметьте его как решенное, еще раз для дальнейшего использования! Спасибо

stelios.anastasakis 27.04.2024 14:10

Клиент много раз связывается с одним и тем же сервером, поскольку это ответ с разбивкой на страницы, и я обрабатываю все доступные данные. Но это значение, похоже, работает, и производительность, насколько я вижу, не пострадала. Никаких ошибок сегодня не было, так что это здорово, с этим возникли некоторые проблемы.

benjaminv2 27.04.2024 22:26

Вы также можете проверить projectreactor.io/docs/netty/release/reference/…

Violeta Georgieva 29.04.2024 14:23

@stelios.anastasakis После 5 дней нормальной работы обработки данных сегодня утром я снова столкнулся с ошибкой. Это определенно уменьшило количество случаев возникновения ошибок, но, похоже, не было полностью исправлено.

benjaminv2 03.05.2024 09:53

может нужно еще немного настроить? Например, играйте со значением, которое вы используете. Также попробуйте отладку с помощью руководств, которыми поделилась @VioletaGeorgieva. Могут быть и другие области, вызвавшие этот сбой, например, какая-то конфигурация целевого сервера. Если за много дней произошла одна ошибка, возможно, это что-то очень маленькое и специфическое.

stelios.anastasakis 10.05.2024 10:50

@stelios.anastasakis Я попросил установить на удаленном компьютере такие инструменты, как tcpdump, для устранения проблемы. Это действительно кажется очень конкретным, поскольку проблемы не возникало с тех пор, как я в последний раз оставлял здесь комментарий. Это все еще достаточно хорошо для текущего варианта использования, и я посмотрю, что произойдет дальше, когда я смогу его отладить. Я обновлю сообщение дополнительной информацией, как только она у меня появится, поскольку она может быть полезна тем, кто еще сталкивается с этой проблемой.

benjaminv2 10.05.2024 11:55

Другие вопросы по теме