Я использую асинхронный клиент Apache HTTP для выполнения HTTP-запросов (путем потребления данных из шины Kafka) и обработки ответов в обратных вызовах.
В своем журнале я обнаружил некоторые интересные варианты поведения обратных вызовов и хотел бы выяснить, почему это так.
Вот часть кода (потребляющая данные из кафки и выполняющая http-запрос):
while (true) {
ConsumerRecords<String, MyMessage> records = kafkaConsumer.poll(1000);
if (records.isEmpty()) {
logger.info("Polling Empty ....");
continue;
}
int numberOfRecords = records.count();
final CountDownLatch batchLatch = new CountDownLatch(numberOfRecords);
for (ConsumerRecord<String, MyMessage> record: records) {
final HttpPost request = new HttpPost(endpoint);
// set headers
// set entity
request.setEntity(messageToEntity(record.value()));
httpClient.execute(request, someCallback);
}
batchLatch.await();
kafkaConsumer.commitAsync();
}
А вот пример кода обратного вызова:
public class SomeCallback {
// ...
@Override
public void completed(final HttpResponse response) {
// do something
logger.info("{} - blablabla", status, ...);
latch.countDown();
}
@Override
public void failed(final Exception ex) {
// do something
logger.error("{} - blablabla", FAILED, ..., ex);
latch.countDown();
}
@Override
public void cancelled() {
// do something
logger.error("{} - blablabla!", CANCELLED, ...);
latch.countDown();
}
}
Итак, ситуация такова, что в какой-то момент я запустил программу, и она начала потреблять данные из кафки. Максимальное количество записей на опрос - 500.
А поскольку на Kafka уже есть много неиспользованных данных, программа столкнулась с высокой пропускной способностью.
Теперь журнал выглядит так:
[INFO ] 2018-04-29 04:11:29.234 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 04:11:30.362 [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 2] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 2] SUCCESS blablabla
...
...
[INFO ] 2018-04-29 06:28:35.003 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 06:28:35.363 [I/O dispatcher 386] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 385] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 386] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 385] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 386] SUCCESS blablabla
...
[INFO ] 2018-04-29 06:31:35.003 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 06:32:12.418 [I/O dispatcher 405] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] SUCCESS blablabla
...
[ERROR] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] FAILED blablabla! org.apache.http.ConnectionClosedException: Connection closed unexpectedly
[ERROR] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] FAILED blablabla! org.apache.http.ConnectionClosedException: Connection closed unexpectedly
// there the program stopped
Вот чего я не могу понять:
Спасибо за комментарии Олега, я обнаружил, что количество диспетчеров ввода-вывода постоянно увеличивается, потому что каждый раз, когда я получаю данные от Kafka, создается новый http-клиент. Тогда будет много простаивающих клиентов, занимающих операции ввода-вывода и ресурсы. По этой же причине программа остановилась.
У меня еще остались вопросы:
@oleg да, я. Я тоже только что нашел эту ошибку. Клиент не был закрыт и создан новый клиент. Потом у нас появилось много неработающих клиентов. Поэтому количество диспетчеров io увеличивается. Спасибо.
Вы действительно очень должны повторно использовать один и тот же экземпляр HttpAsyncClient для всех логически связанных запросов.
@oleg У меня остались вопросы: 1. Что означает номер диспетчера ввода / вывода? это количество разных потоков? 2. как я могу контролировать максимальное количество диспетчеров ввода-вывода для одного http-клиента? это от I/O dispatcher
? 3. В чем разница между этим номер диспетчера io и количество подключений? Спасибо !
Вы можете обновить свой вопрос, и я отвечу на все вопросы?
@oleg Добавил обновления. Спасибо !
Он представляет собой имя потока.
Свойства и поведение реактора ввода-вывода можно контролировать с помощью IOReactorConfig
Небольшое количество диспетчеров ввода-вывода управляет гораздо большим количеством соединений.
Скорее всего, один поток диспетчеризации ввода-вывода на ядро ЦП является разумным значением по умолчанию для большинства приложений, и его не следует изменять.
Вы ведь не создаете каждый раз новый экземпляр HttpAsyncClient?