SchemaException — ошибка при чтении поля «leader_id»: длина строки -1 не может быть отрицательной — концентраторы событий Azure — Kafka

Я получаю это исключение

org.apache.kafka.common.protocol.types.SchemaException

пока кафка перебалансирует

Это детали:

  1. Использование концентраторов событий Azure. Доступ к нему с помощью API kafka

  2. "Kafka Enabled" = да, на лазурном портале

  3. использование: группа компиляции: «org.apache.kafka», имя: «kafka-clients», версия: «1.0.2»

  4. Использование потребительской группы

  5. Properties properties = new Properties();
    properties.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    properties.put(SaslConfigs.SASL_JAAS_CONFIG, saslJaasConfig);
    properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, String.format("%s.servicebus.windows.net:9093", this.namespace));
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MeasurementDeSerializer.class.getName());
    properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, this.groupName);
    properties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
    

У меня 2 клиента на 2 разных компах

Когда они оба запускаются, каждый получает 16 разделов из 32 доступных разделов.

Когда я отключаю один из них, все части перебалансируются на другой.

на экземпляре, который все еще работает, я получу:

  1. Разделы отозваны [16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31]

  2. Затем из цикла пула я получу это исключение:

    org.apache.kafka.common.protocol.types.SchemaException: ошибка чтения поля 'leader_id': длина строки -1 не может быть отрицательной на org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76) на org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:279) в org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:586) в org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:686) на org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:469) на org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258) на org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230) на org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190) на org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) в org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) на org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) на org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) на org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)

С другой стороны, при движении в другую сторону проблем нет.

  1. Запустить первый экземпляр

  2. Экземпляр 1 получает все 32 раздела

  3. Запустить экземпляр 2

  4. Начинается ребалансировка

  5. Экземпляр 1 теряет 16 деталей

  6. экземпляр 2 получает 16 частей

Любая идея, что может вызвать это исключение?

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
344
1

Ответы 1

Для будущих читателей - проблема исправлена. https://github.com/Azure/azure-event-hubs-for-kafka/issues/41

Другие вопросы по теме

Можно ли настроить концентратор событий Azure для сохранения сообщения, если Функция Azure не сможет его обработать?
Как предоставить prometheus метрики, хранящиеся в лазурных таблицах или концентраторе событий?
Можем ли мы обрабатывать мультимедийные файлы, такие как изображения и видео, через концентраторы событий Azure?
Концентратор событий Azure/функция Azure — синглтон
Как получить сведения об экземпляре концентратора событий Microsoft Azure, такие как строка подключения концентратора событий, из нашего приложения
Запись сообщений из концентратора событий в NewRelic занимает много времени (более 6 часов)
Маршрутизация сообщений из концентратора событий Azure в служебную шину Azure
Запуск функции Azure с помощью концентратора событий Azure, содержащего некоторое свойство пользователя
Концентраторы событий Azure Запись в хранилище с включенным Data Lake Gen2
Azure EventHub и устойчивые функции