Я настраиваю Netty и не могу понять, какова цель опции pendingAcquireTimeout
. Использование WebClient
из Spring Framework 6.1.8 вместе с Netty 1.1.19.
В документации написано:
Максимальное время, до которого должно завершиться ожидающее получение или будет выдано исключение TimeoutException (разрешение: мс). Если указано -1, такой тайм-аут не применяется. По умолчанию: 45 секунд.
Допустим, у меня есть следующая конфигурация:
ConnectionProvider.Builder providerBuilder = ConnectionProvider.builder("custom")
.maxConnections(20) // pending queue size is 40 (2 x connection size)
.pendingAcquireTimeout(Duration.ofSeconds(45)); // this should be default
HttpClient httpClient = HttpClient.create(providerBuilder.build());
ReactorClientHttpConnector connector = new ReactorClientHttpConnector(httpClient);
WebClient webClient = WebClient.builder()
.baseUrl("http://127.0.0.1:8080")
.clientConnector(connector)
.build();
Теперь я вызываю 61 параллельный запрос к медленной конечной точке(Thread.sleep(Duration.ofSeconds(60))
).
Я ожидаю только одну ошибку, см. ниже. Есть 20 подключений в процессе, 40 в ожидании и 1 отключено.
Caused by: org.springframework.web.reactive.function.client.WebClientRequestException: Pending acquire queue has reached its maximum size of 40
Все идет нормально.
Вопрос в том, влияет ли опция pendingAcquireTimeout
на этот вариант использования вообще? Если да, то как?
Моя гипотеза заключалась в том, что запросы будут оставаться в очереди ожидания только в течение определенных 45 секунд и выдавать исключение, потому что конечная точка слишком медленная. Но этого не произошло, их постепенно обслуживают.
Что ж, свойство pendingAcquireTimeout
ведет себя так, как ожидалось. Получение этой ошибки:
Error org.springframework.web.reactive.function.client.WebClientRequestException: Pool#acquire(Duration) has been pending for more than the configured timeout of 45000ms
Мне это пока не до конца ясно, но основная причина — состояние гонки. При одновременном запуске 61 потока создается несколько провайдеров соединений. Более того, дополнительные провайдеры не учитывают мою конфигурацию.
final Runnable callWebClient = () -> {
logger.info("Started");
try {
final var result = webClient.get()
.uri("/blocking")
.retrieve()
.bodyToMono(String.class)
.block();
logger.info("Finished, {}", result);
} catch (Exception e) {
logger.error("Error", e);
}
};
final ThreadFactory factory = Thread.ofVirtual().name("routine-", 0).factory();
try (var executor = Executors.newThreadPerTaskExecutor(factory)) {
IntStream.range(0, 61)
.forEach(i -> executor.submit(callWebClient));
} catch (Exception e) {
logger.error("Error", e);
}
Следующая простая задержка исправляет это:
IntStream.range(0, 61)
.forEach(i -> {
executor.submit(callWebClient);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});