Оператор Flink завис на 100% занятости, как мне это отключить?

Я развернул кластер Flink как приложение пряжи. В рамках конфигурации пряжи я связал 32 виртуальных ядра с каждым диспетчером задач. Я также выделил по 2 слота для каждого диспетчера задач.

Конвейер заданий: источник Kafka > Фильтр > RichMapFunction > TumblingWindow (время обработки) > Приемник

Я установил для параметров «Параллелизм» и «Максимальный параллелизм» значение 8 и обеспечил, чтобы моя функция keyBy равномерно распределяла сообщения по каждой подзадаче, как показано на рисунке ниже:

Ниже представлена ​​визуализация того, как я представляю себе размещение параллельных экземпляров операторов. Основываясь на том, что я прочитал в документации Flink здесь.

Вопрос 1: Это правильно? А во-вторых, каждый круг будет выполняться на отдельном потоке?

Загрузка ЦП TaskManager едва достигает 7%, поэтому он явно не использует все доступные ядра.

Вопрос 2. Чтобы увеличить загрузку ЦП одним диспетчером задач, следует ли мне увеличить количество слотов для каждого диспетчера задач?

Вопрос 3. Приведет ли увеличение параллелизма к снижению показателя занятости?

Мой последний вопрос касается использования мной функции агрегатора в окне времени обработки переворачивающихся данных. Он имеет хеш-карту, в которой хранятся последние значения сообщений на основе их идентификаторов, где ключом является идентификатор, а значением сообщения. Затем выходные данные передаются в RichAsyncFunc, который сохраняет эти значения в БД. Длина окна составляет 200 мс, это позволяет мне отправлять только 4 запроса к БД в секунду для данного экземпляра приемника, в отличие от каждого сообщения, которого было бы слишком много.

Вопрос 4. Подходит ли эта функция для этой работы? Или будет более эффективный подход к использованию.

Спасибо за прочтение!

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
107
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вопрос 1: Это правильно? А во-вторых, каждый круг будет выполняться на отдельном потоке?

Нет, везде, где есть пересылающее соединение, эти операторы (кружки) будут работать в одном потоке (если вы не отключите цепочку операторов). Таким образом, источник и фильтр используют общий поток, а окно и приемник используют один и тот же поток.

Вопрос 2. Чтобы увеличить загрузку ЦП одним диспетчером задач, следует ли мне увеличить количество слотов для каждого диспетчера задач?

Да, вы можете сделать это как способ увеличения параллелизма без добавления дополнительного оборудования.

Вопрос 3. Приведет ли увеличение параллелизма к снижению показателя занятости?

Может быть. Зависит от того, что стало причиной затруднения. Это также может усугубить ситуацию.

Вопрос 4. Подходит ли эта функция для этой работы? Или будет более эффективный подход к использованию.

Я подозреваю, что RichAsyncFunc является причиной ваших проблем. Разве для вашей БД нет реального приемника? В общем, ответом является уменьшение количества операций записи (большими пакетами) в приемник.

Кроме того, действительно ли вам нужно использовать keyBy дважды: один раз между фильтром и картой, а затем снова между картой и окном? Это дорого.

Понятно, спасибо за разъяснения, Дэвид. С тех пор, как я написал этот вопрос, я провел множество исследований, в том числе полностью удалил RichAsyncFunc, чтобы исключить любые проблемы. В результате своего расследования я обнаружил, что когда аккумулятор в моей AggregatorFunction стал достаточно большим, метрика занятости резко возросла, и пропускная способность стала очень низкой. Когда я решил эту проблему, все резко ускорилось.

raah 15.06.2024 21:15

Другие вопросы по теме