Я развернул кластер Flink как приложение пряжи. В рамках конфигурации пряжи я связал 32 виртуальных ядра с каждым диспетчером задач. Я также выделил по 2 слота для каждого диспетчера задач.
Конвейер заданий: источник Kafka > Фильтр > RichMapFunction > TumblingWindow (время обработки) > Приемник
Я установил для параметров «Параллелизм» и «Максимальный параллелизм» значение 8 и обеспечил, чтобы моя функция keyBy равномерно распределяла сообщения по каждой подзадаче, как показано на рисунке ниже:
Ниже представлена визуализация того, как я представляю себе размещение параллельных экземпляров операторов. Основываясь на том, что я прочитал в документации Flink здесь.
Вопрос 1: Это правильно? А во-вторых, каждый круг будет выполняться на отдельном потоке?
Загрузка ЦП TaskManager едва достигает 7%, поэтому он явно не использует все доступные ядра.
Вопрос 2. Чтобы увеличить загрузку ЦП одним диспетчером задач, следует ли мне увеличить количество слотов для каждого диспетчера задач?
Вопрос 3. Приведет ли увеличение параллелизма к снижению показателя занятости?
Мой последний вопрос касается использования мной функции агрегатора в окне времени обработки переворачивающихся данных. Он имеет хеш-карту, в которой хранятся последние значения сообщений на основе их идентификаторов, где ключом является идентификатор, а значением сообщения. Затем выходные данные передаются в RichAsyncFunc, который сохраняет эти значения в БД. Длина окна составляет 200 мс, это позволяет мне отправлять только 4 запроса к БД в секунду для данного экземпляра приемника, в отличие от каждого сообщения, которого было бы слишком много.
Вопрос 4. Подходит ли эта функция для этой работы? Или будет более эффективный подход к использованию.
Спасибо за прочтение!





Вопрос 1: Это правильно? А во-вторых, каждый круг будет выполняться на отдельном потоке?
Нет, везде, где есть пересылающее соединение, эти операторы (кружки) будут работать в одном потоке (если вы не отключите цепочку операторов). Таким образом, источник и фильтр используют общий поток, а окно и приемник используют один и тот же поток.
Вопрос 2. Чтобы увеличить загрузку ЦП одним диспетчером задач, следует ли мне увеличить количество слотов для каждого диспетчера задач?
Да, вы можете сделать это как способ увеличения параллелизма без добавления дополнительного оборудования.
Вопрос 3. Приведет ли увеличение параллелизма к снижению показателя занятости?
Может быть. Зависит от того, что стало причиной затруднения. Это также может усугубить ситуацию.
Вопрос 4. Подходит ли эта функция для этой работы? Или будет более эффективный подход к использованию.
Я подозреваю, что RichAsyncFunc является причиной ваших проблем. Разве для вашей БД нет реального приемника? В общем, ответом является уменьшение количества операций записи (большими пакетами) в приемник.
Кроме того, действительно ли вам нужно использовать keyBy дважды: один раз между фильтром и картой, а затем снова между картой и окном? Это дорого.
Понятно, спасибо за разъяснения, Дэвид. С тех пор, как я написал этот вопрос, я провел множество исследований, в том числе полностью удалил RichAsyncFunc, чтобы исключить любые проблемы. В результате своего расследования я обнаружил, что когда аккумулятор в моей AggregatorFunction стал достаточно большим, метрика занятости резко возросла, и пропускная способность стала очень низкой. Когда я решил эту проблему, все резко ускорилось.