Количество диспетчеров ввода-вывода в apache http async client

Я использую асинхронный клиент Apache HTTP для выполнения HTTP-запросов (путем потребления данных из шины Kafka) и обработки ответов в обратных вызовах.

В своем журнале я обнаружил некоторые интересные варианты поведения обратных вызовов и хотел бы выяснить, почему это так.

Вот часть кода (потребляющая данные из кафки и выполняющая http-запрос):

    while (true) {
        ConsumerRecords<String, MyMessage> records = kafkaConsumer.poll(1000);
        if (records.isEmpty()) {
            logger.info("Polling Empty ....");
            continue;
        }

        int numberOfRecords = records.count();
        final CountDownLatch batchLatch = new CountDownLatch(numberOfRecords);
        for (ConsumerRecord<String, MyMessage> record: records) {
            final HttpPost request = new HttpPost(endpoint);
            // set headers
            // set entity
            request.setEntity(messageToEntity(record.value()));
            httpClient.execute(request, someCallback);
        }

        batchLatch.await();
        kafkaConsumer.commitAsync();
    }

А вот пример кода обратного вызова:

public class SomeCallback {
    // ...
    @Override
    public void completed(final HttpResponse response) {
        // do something
        logger.info("{} - blablabla", status, ...);
        latch.countDown();
    }

    @Override
    public void failed(final Exception ex) {
        // do something
        logger.error("{} - blablabla", FAILED, ..., ex);
        latch.countDown();
    }

    @Override
    public void cancelled() {
        // do something
        logger.error("{} - blablabla!", CANCELLED, ...);
        latch.countDown();
    }
}

Итак, ситуация такова, что в какой-то момент я запустил программу, и она начала потреблять данные из кафки. Максимальное количество записей на опрос - 500.

А поскольку на Kafka уже есть много неиспользованных данных, программа столкнулась с высокой пропускной способностью.

Теперь журнал выглядит так:

[INFO ] 2018-04-29 04:11:29.234 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 04:11:30.362 [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 2] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 1] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 2] SUCCESS blablabla
...
...
[INFO ] 2018-04-29 06:28:35.003 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 06:28:35.363 [I/O dispatcher 386] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 385] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 386] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 385] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 386] SUCCESS blablabla
...
[INFO ] 2018-04-29 06:31:35.003 [main] MyClass - Polling got records: 500
[INFO ] 2018-04-29 06:32:12.418 [I/O dispatcher 405] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] SUCCESS blablabla
[INFO ] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] SUCCESS blablabla
...
[ERROR] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 405] FAILED blablabla! org.apache.http.ConnectionClosedException: Connection closed unexpectedly
[ERROR] 2018-04-29 xx:xx:xx.xxx [I/O dispatcher 406] FAILED blablabla! org.apache.http.ConnectionClosedException: Connection closed unexpectedly
// there the program stopped

Вот чего я не могу понять:

  1. Почему на каждые 500 запросов (обработка ответов) приходится только два диспетчера ввода-вывода? Это из-за настройки по умолчанию макс 2?
  2. Почему количество диспетчеров ввода-вывода увеличивается на каждые 500 запросов? Обычно, что я испытал раньше, так это то, что количество диспетчеров ввода-вывода увеличится, но также уменьшится до 1 и 2. Я предполагаю, что: он будет повторно использовать некоторые из предыдущих диспетчеров ввода-вывода вместо каждого время создавать новые.
  3. Почему ConnectionClosedException в конце? Это потому, что слишком много диспетчеров ввода-вывода останавливают программу?

Обновлять

Спасибо за комментарии Олега, я обнаружил, что количество диспетчеров ввода-вывода постоянно увеличивается, потому что каждый раз, когда я получаю данные от Kafka, создается новый http-клиент. Тогда будет много простаивающих клиентов, занимающих операции ввода-вывода и ресурсы. По этой же причине программа остановилась.

У меня еще остались вопросы:

  1. Что означает номер диспетчера ввода-вывода, отображаемый в журналах? Это количество разных потоков?
  2. Как я могу контролировать максимальное количество диспетчеров ввода-вывода для одного http-клиента? это диспетчер ввода-вывода?
  3. В чем разница между количеством диспетчеров ввода-вывода и количеством подключений?
  4. Как оценить количество диспетчеров ввода-вывода и количество подключений, которые мне понадобятся, в зависимости от машины, на которой я запускаю программу, и пропускной способности / размера данных?

Вы ведь не создаете каждый раз новый экземпляр HttpAsyncClient?

ok2c 01.05.2018 14:53

@oleg да, я. Я тоже только что нашел эту ошибку. Клиент не был закрыт и создан новый клиент. Потом у нас появилось много неработающих клиентов. Поэтому количество диспетчеров io увеличивается. Спасибо.

fluency03 01.05.2018 15:25

Вы действительно очень должны повторно использовать один и тот же экземпляр HttpAsyncClient для всех логически связанных запросов.

ok2c 02.05.2018 09:26

@oleg У меня остались вопросы: 1. Что означает номер диспетчера ввода / вывода? это количество разных потоков? 2. как я могу контролировать максимальное количество диспетчеров ввода-вывода для одного http-клиента? это от I/O dispatcher? 3. В чем разница между этим номер диспетчера io и количество подключений? Спасибо !

fluency03 02.05.2018 10:07

Вы можете обновить свой вопрос, и я отвечу на все вопросы?

ok2c 02.05.2018 11:52

@oleg Добавил обновления. Спасибо !

fluency03 02.05.2018 12:12
Асинхронная передача данных с помощью sendBeacon в JavaScript
Асинхронная передача данных с помощью sendBeacon в JavaScript
В современных веб-приложениях отправка данных из JavaScript на стороне клиента на сервер является распространенной задачей. Одним из популярных...
2
6
2 192
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий
  1. Он представляет собой имя потока.

  2. Свойства и поведение реактора ввода-вывода можно контролировать с помощью IOReactorConfig

  3. Небольшое количество диспетчеров ввода-вывода управляет гораздо большим количеством соединений.

  4. Скорее всего, один поток диспетчеризации ввода-вывода на ядро ​​ЦП является разумным значением по умолчанию для большинства приложений, и его не следует изменять.

Другие вопросы по теме