Я хочу задушить потребителей Kafka своей работой в Flink.
Просматривая исходный код Flink 1.12, я нахожу FlinkConnectorRateLimiter
и GuavaFlinkConnectorRateLimiter
. Но я не могу найти ничего, связывающего этот ограничитель скорости с FlinkKafkaConsumer
.
Как я могу реализовать ограничение скорости для Kafka в Flink 1.12?
FlinkConnectorRateLimiter
был доступен с устаревшим потребителем Kafka (flink-connector-kafka-0.10), который был удален в Flink 1.12. Текущий потребитель kafka не предлагает ограничение скорости.
См. эту ветку списка рассылки -- https://lists.apache.org/thread/j7kw131jn0ozmrj763j0hr87b1rj7jop -- для обсуждения. Короче говоря, ограничение скорости больше не должно иметь никакой привлекательности после завершения текущих улучшений контрольных точек при противодавлении и перекосе времени события, поэтому на самом деле нет никакого желания добавлять поддержку ограничения скорости.
Тем не менее, в ветке списка рассылки выше есть пример, показывающий, как реализовать ограничение скорости для Kafka самостоятельно, расширив FlinkKafkaConsumer
, чтобы переопределить emitRecord
и emitRecordWithTimestamp
.
Обратите внимание, что вы должны быть осторожны, чтобы никогда не блокировать контрольные точки, что означает, что вы должны избегать ожидания в основном потоке обработки. Схемы десериализации выполняются в другом потоке, поэтому это лучшее место для ограничения скорости.
Вы ссылались на это?
Да. Спасибо; Я обновил ссылку.
Ссылка, кажется, битая. Не могли бы вы обновить его?