Кафка Носик дважды прочитал сообщение о топологии шторма

Я пытаюсь имитировать потоковый трафик, используя Kafka to Storm. Я использовал KafkaSpout, чтобы прочитать сообщение из одной темы, отправленное продюсером, который прочитал эти твиты и отправил их в тему. Моя проблема в том, что после того, как топология потребляет все твиты, отправленные в этой теме, она продолжает дважды читать сообщение в теме. Как я могу запретить KafkaSpout читать дважды? (Коэффициент репликации установлен на 1)

Прежде всего, убедитесь, что вы используете последнюю версию Storm. Если это по-прежнему не работает, опубликуйте свою конфигурацию топологии (подключение топологии, возможно, также pom.xml).

Stig Rohde Døssing 24.05.2019 21:26

Спасибо за ответ. Я редактирую свой пост с этой информацией.

Marco Domenicano 24.05.2019 22:31

Не могли бы вы также опубликовать свою конфигурацию носика? Обратите внимание на несколько других вещей, которые вы, возможно, захотите изменить: Область действия storm-core должна быть «предоставлена», а не «скомпилирована». В Classifier.execute вы можете дважды подтвердить кортеж, если произойдет исключение. Вам нужно убедиться, что кортеж подтвержден только один раз, иначе Storm сочтет его неудачным и воспроизведет его повторно. Наконец, подумайте об обновлении до storm-kafka-client, а также новой версии Kafka. 0.8.2.2 очень старая, а storm-kafka не рекомендуется удалять.

Stig Rohde Døssing 24.05.2019 23:31

Хорошо, спасибо, я думаю, что одна из ошибок дважды повторяется! Я попытался перейти на storm-kafka-client, но, похоже, он не может читать данные из темы. Я обновляю свой первый пост с kafkaSpoutCreator и его конфигурацией. Действительно спасибо за эту помощь, я на самом деле новичок в этих фреймворках.

Marco Domenicano 24.05.2019 23:36

Да, он не может прочитать данные, потому что Kafka слишком стар. storm-kafka-client требуется Kafka 0.10.1.0 (насколько я помню). Но вы можете продолжать использовать storm-kafka, если хотите, просто хотел убедиться, что вы знаете, что он будет удален со Storm 2.0.0. storm-kafka также не совместим с Kafka после версии 2.0.0.

Stig Rohde Døssing 24.05.2019 23:41

На самом деле я использую последнюю версию Apache Kafka (kafka_2.12-2.2.0), но она отлично работала со storm-kafka, пока не появились эти ошибки. Я обновил свой проект, вставив builder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:9092", "tweet").build()), 1); но он не читает из потока Кафки.

Marco Domenicano 24.05.2019 23:59

Хорошо, это удивительно. Это также может быть проблемой, поскольку вы используете клиентский код с Kafka 2.x, который устарел. Смена хорошая. Также не забудьте добавить версию kafka-clients в свой помпон. Вы также должны иметь возможность удалить зависимость kafka_2.9.2. Наконец, вы, возможно, захотите знать о Issues.apache.org/jira/browse/STORM-3102, до Storm 1.2.3 (который, как мы ожидаем, будет выпущен в ближайшее время) есть проблема с производительностью Kafka 2.0.0.

Stig Rohde Døssing 25.05.2019 00:06

Он выдает мне org.apache.kafka.common.errors.InvalidGroupIdException: чтобы использовать API-интерфейсы управления группой или фиксации смещения, вы должны указать действительный group.id в конфигурации потребителя.

Marco Domenicano 25.05.2019 00:09

Вам нужно вызвать setProp(ConsumerConfig.GROUP_ID_CONFIG, "your-group-name-here") на KafkaSpoutConfig. См. также github.com/apache/storm/blob/master/examples/… для полного примера.

Stig Rohde Døssing 25.05.2019 00:11

хорошо, я попробую установить этот новый носик. Он кажется более сложным, чем предыдущий. В проекте нет ошибки компиляции, но я не могу проверить, решена ли проблема или нет. Я буду обновлять эту тему обновлениями, спасибо! Вы действительно улучшили мой день!

Marco Domenicano 25.05.2019 00:29

Рад помочь, надеюсь, у вас все получится.

Stig Rohde Døssing 25.05.2019 00:42
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
11
188
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Конфигурация выглядит хорошо для меня.

Возможно, проблема в двойном копировании. Убедитесь, что вы подтверждаете каждый кортеж только один раз в execute.

Как упоминалось в комментарии, рассмотрите возможность перехода на более новую версию Kafka, а также о переходе на storm-kafka-client.

Также кое-что, что может сделать вашу жизнь немного проще: рассмотрите возможность расширения BaseBasicBolt вместо BaseRichBolt. BaseBasicBolt автоматически подтверждает кортеж, если запуск execute не вызывает ошибку. Если вы хотите провалить кортеж, вы можете бросить FailedException. BaseRichBolt следует использовать только в том случае, если вы хотите выполнить более сложную проверку, например. объединение кортежей из многих execute вызовов в памяти перед подтверждением.

Я заметил, что скорость потребления резко снижается в 4 раза! Он медленнее предыдущего. Есть ли какой-либо конкретный вариант для более быстрого получения сообщения?

Marco Domenicano 25.05.2019 10:33

Цитирую себя выше: «Наконец, вы, возможно, захотите узнать о Issues.apache.org/jira/browse/STORM-3102, до Storm 1.2.3 (который, как мы ожидаем, будет выпущен в ближайшее время) есть проблема с производительностью Kafka 2.0. 0". Вам необходимо обновиться до storm-kafka-client 1.2.3. Если вы не хотите ждать релиза, вы можете либо загрузить и собрать Storm самостоятельно, либо получить банку по адресу репозиторий.apache.org/content/repositories/orgapachestorm-10‌​81/….

Stig Rohde Døssing 25.05.2019 10:47

Извините, я должен дважды опубликовать свой ответ. Я попытался загрузить и поместить банку в банку проекта intellij, но я получаю исключение, вызванное: java.lang.ClassNotFoundException: com.google.common.base.Supplier, когда я запускаю топологию

Marco Domenicano 25.05.2019 11:05

Вместо этого вы хотите установить версию storm-kafka-client в вашем POM на 1.2.3. Затем загрузите каталог «org» по адресу репозиторий.apache.org/content/repositories/orgapachestorm-10‌​81 и поместите его в свой локальный репозиторий Maven (скорее всего, в свой пользовательский каталог ~/.m2/repository). Это должно позволить IntelliJ/Maven забрать банки.

Stig Rohde Døssing 25.05.2019 11:25

Вы также можете просто добавить URL-адрес репозиторий.apache.org/content/repositories/orgapachestorm-10‌​81 в качестве репозитория в свой POM, как описано в maven.apache.org/guides/mini/guide-multiple-repositories.htm‌​l, что может быть проще.

Stig Rohde Døssing 25.05.2019 11:26

Все работает нормально (огромное спасибо), за исключением первой ошибки, которую я описал выше в первом посте. Кажется, есть дублированные или два раза проанализированные кортежи. Я обновился до BaseBasicBolt и удалил ack, и он был уменьшен, но все еще существует (например, у меня есть 600 тысяч твитов, созданных json tuple, и в конце я нахожу 650 тысяч твитов)

Marco Domenicano 25.05.2019 11:50

Я бы поискал повторы. Возможно, время ожидания некоторых из ваших кортежей истекло, и они повторяются. Попробуйте реализовать один из этих github.com/apache/storm/blob/v1.2.3/external/storm-kafka-cli‌​ent/…. Вы можете добавить его в файл KafkaSpoutConfig. Реализуйте метод onRetry для регистрации чего-либо. Если это сообщение регистрируется при запуске, значит, какой-то кортеж не прошел и воспроизводится повторно.

Stig Rohde Døssing 25.05.2019 12:00

Не могли бы вы объяснить мне, как? Я использую конфигурацию из примера, предоставленного Storm (я отредактирую первый пост, потому что он слишком длинный), для чтения из потока kafka.

Marco Domenicano 25.05.2019 12:14

Создайте новый класс, реализующий интерфейс, который я связал. Сделайте так, чтобы метод onRetry в этом классе регистрировал сообщение при его вызове. Затем создайте новый экземпляр этого класса и зарегистрируйте его в KafkaSpoutConfig, вызвав этот метод github.com/apache/storm/blob/v1.2.3/external/storm-kafka-cli‌​ent/….

Stig Rohde Døssing 25.05.2019 12:24

Хорошо, кажется, все работает нормально. Я запускаю свои топологии в локальном режиме, и на самом деле они достигают максимального размера, анализируя 1 миллион кортежей (после этого обрабатывают только 1000 кортежей каждые 10 минут), поэтому я думаю, что мне нужно обрабатывать в кластере.

Marco Domenicano 25.05.2019 14:12

Как вы сказали, метод onRetry показывает, что некоторые кортежи не обрабатываются и обрабатываются дважды. Как я могу остановить это поведение?

Marco Domenicano 25.05.2019 14:26

Это зависит от того, что вы хотите, чтобы произошло. Если это связано с истечением времени ожидания, вы можете ограничить количество кортежей, которые Storm будет пытаться обрабатывать за раз. См. github.com/apache/storm/blob/master/docs/…. Вы устанавливаете его в объекте Config, который вы передаете StormSubmitter. Затем вам следует избегать тайм-аутов и получать дубликаты только в том случае, если кортеж действительно не обрабатывается.

Stig Rohde Døssing 25.05.2019 15:00

Если, с другой стороны, вы не хотите повторять кортежи в случае сбоя (и допускаете некоторую вероятность потери сообщения), вы можете использовать github.com/apache/storm/blob/v1.2.3/external/storm-kafka-cli‌​ent/… в своем KafkaSpoutConfig.

Stig Rohde Døssing 25.05.2019 15:00

Хорошо, я решил свою проблему с этими настройками. Действительно, действительно спасибо. Я вставлю тебя в благодарность за диссертацию :D

Marco Domenicano 26.05.2019 00:20

Другие вопросы по теме