Я пытаюсь имитировать потоковый трафик, используя Kafka to Storm. Я использовал KafkaSpout, чтобы прочитать сообщение из одной темы, отправленное продюсером, который прочитал эти твиты и отправил их в тему. Моя проблема в том, что после того, как топология потребляет все твиты, отправленные в этой теме, она продолжает дважды читать сообщение в теме. Как я могу запретить KafkaSpout читать дважды? (Коэффициент репликации установлен на 1)
Спасибо за ответ. Я редактирую свой пост с этой информацией.
Не могли бы вы также опубликовать свою конфигурацию носика? Обратите внимание на несколько других вещей, которые вы, возможно, захотите изменить: Область действия storm-core должна быть «предоставлена», а не «скомпилирована». В Classifier.execute вы можете дважды подтвердить кортеж, если произойдет исключение. Вам нужно убедиться, что кортеж подтвержден только один раз, иначе Storm сочтет его неудачным и воспроизведет его повторно. Наконец, подумайте об обновлении до storm-kafka-client
, а также новой версии Kafka. 0.8.2.2 очень старая, а storm-kafka
не рекомендуется удалять.
Хорошо, спасибо, я думаю, что одна из ошибок дважды повторяется! Я попытался перейти на storm-kafka-client, но, похоже, он не может читать данные из темы. Я обновляю свой первый пост с kafkaSpoutCreator и его конфигурацией. Действительно спасибо за эту помощь, я на самом деле новичок в этих фреймворках.
Да, он не может прочитать данные, потому что Kafka слишком стар. storm-kafka-client
требуется Kafka 0.10.1.0 (насколько я помню). Но вы можете продолжать использовать storm-kafka
, если хотите, просто хотел убедиться, что вы знаете, что он будет удален со Storm 2.0.0. storm-kafka
также не совместим с Kafka после версии 2.0.0.
На самом деле я использую последнюю версию Apache Kafka (kafka_2.12-2.2.0), но она отлично работала со storm-kafka, пока не появились эти ошибки. Я обновил свой проект, вставив builder.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:9092", "tweet").build()), 1); но он не читает из потока Кафки.
Хорошо, это удивительно. Это также может быть проблемой, поскольку вы используете клиентский код с Kafka 2.x, который устарел. Смена хорошая. Также не забудьте добавить версию kafka-clients
в свой помпон. Вы также должны иметь возможность удалить зависимость kafka_2.9.2
. Наконец, вы, возможно, захотите знать о Issues.apache.org/jira/browse/STORM-3102, до Storm 1.2.3 (который, как мы ожидаем, будет выпущен в ближайшее время) есть проблема с производительностью Kafka 2.0.0.
Он выдает мне org.apache.kafka.common.errors.InvalidGroupIdException: чтобы использовать API-интерфейсы управления группой или фиксации смещения, вы должны указать действительный group.id в конфигурации потребителя.
Вам нужно вызвать setProp(ConsumerConfig.GROUP_ID_CONFIG, "your-group-name-here")
на KafkaSpoutConfig
. См. также github.com/apache/storm/blob/master/examples/… для полного примера.
хорошо, я попробую установить этот новый носик. Он кажется более сложным, чем предыдущий. В проекте нет ошибки компиляции, но я не могу проверить, решена ли проблема или нет. Я буду обновлять эту тему обновлениями, спасибо! Вы действительно улучшили мой день!
Рад помочь, надеюсь, у вас все получится.
Конфигурация выглядит хорошо для меня.
Возможно, проблема в двойном копировании. Убедитесь, что вы подтверждаете каждый кортеж только один раз в execute
.
Как упоминалось в комментарии, рассмотрите возможность перехода на более новую версию Kafka, а также о переходе на storm-kafka-client
.
Также кое-что, что может сделать вашу жизнь немного проще: рассмотрите возможность расширения BaseBasicBolt
вместо BaseRichBolt
. BaseBasicBolt
автоматически подтверждает кортеж, если запуск execute
не вызывает ошибку. Если вы хотите провалить кортеж, вы можете бросить FailedException
. BaseRichBolt
следует использовать только в том случае, если вы хотите выполнить более сложную проверку, например. объединение кортежей из многих execute
вызовов в памяти перед подтверждением.
Я заметил, что скорость потребления резко снижается в 4 раза! Он медленнее предыдущего. Есть ли какой-либо конкретный вариант для более быстрого получения сообщения?
Цитирую себя выше: «Наконец, вы, возможно, захотите узнать о Issues.apache.org/jira/browse/STORM-3102, до Storm 1.2.3 (который, как мы ожидаем, будет выпущен в ближайшее время) есть проблема с производительностью Kafka 2.0. 0". Вам необходимо обновиться до storm-kafka-client
1.2.3. Если вы не хотите ждать релиза, вы можете либо загрузить и собрать Storm самостоятельно, либо получить банку по адресу репозиторий.apache.org/content/repositories/orgapachestorm-1081/….
Извините, я должен дважды опубликовать свой ответ. Я попытался загрузить и поместить банку в банку проекта intellij, но я получаю исключение, вызванное: java.lang.ClassNotFoundException: com.google.common.base.Supplier, когда я запускаю топологию
Вместо этого вы хотите установить версию storm-kafka-client
в вашем POM на 1.2.3. Затем загрузите каталог «org» по адресу репозиторий.apache.org/content/repositories/orgapachestorm-1081 и поместите его в свой локальный репозиторий Maven (скорее всего, в свой пользовательский каталог ~/.m2/repository
). Это должно позволить IntelliJ/Maven забрать банки.
Вы также можете просто добавить URL-адрес репозиторий.apache.org/content/repositories/orgapachestorm-1081 в качестве репозитория в свой POM, как описано в maven.apache.org/guides/mini/guide-multiple-repositories.html, что может быть проще.
Все работает нормально (огромное спасибо), за исключением первой ошибки, которую я описал выше в первом посте. Кажется, есть дублированные или два раза проанализированные кортежи. Я обновился до BaseBasicBolt и удалил ack, и он был уменьшен, но все еще существует (например, у меня есть 600 тысяч твитов, созданных json tuple, и в конце я нахожу 650 тысяч твитов)
Я бы поискал повторы. Возможно, время ожидания некоторых из ваших кортежей истекло, и они повторяются. Попробуйте реализовать один из этих github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/…. Вы можете добавить его в файл KafkaSpoutConfig. Реализуйте метод onRetry для регистрации чего-либо. Если это сообщение регистрируется при запуске, значит, какой-то кортеж не прошел и воспроизводится повторно.
Не могли бы вы объяснить мне, как? Я использую конфигурацию из примера, предоставленного Storm (я отредактирую первый пост, потому что он слишком длинный), для чтения из потока kafka.
Создайте новый класс, реализующий интерфейс, который я связал. Сделайте так, чтобы метод onRetry в этом классе регистрировал сообщение при его вызове. Затем создайте новый экземпляр этого класса и зарегистрируйте его в KafkaSpoutConfig, вызвав этот метод github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/….
Хорошо, кажется, все работает нормально. Я запускаю свои топологии в локальном режиме, и на самом деле они достигают максимального размера, анализируя 1 миллион кортежей (после этого обрабатывают только 1000 кортежей каждые 10 минут), поэтому я думаю, что мне нужно обрабатывать в кластере.
Как вы сказали, метод onRetry показывает, что некоторые кортежи не обрабатываются и обрабатываются дважды. Как я могу остановить это поведение?
Это зависит от того, что вы хотите, чтобы произошло. Если это связано с истечением времени ожидания, вы можете ограничить количество кортежей, которые Storm будет пытаться обрабатывать за раз. См. github.com/apache/storm/blob/master/docs/…. Вы устанавливаете его в объекте Config, который вы передаете StormSubmitter. Затем вам следует избегать тайм-аутов и получать дубликаты только в том случае, если кортеж действительно не обрабатывается.
Если, с другой стороны, вы не хотите повторять кортежи в случае сбоя (и допускаете некоторую вероятность потери сообщения), вы можете использовать github.com/apache/storm/blob/v1.2.3/external/storm-kafka-client/… в своем KafkaSpoutConfig.
Хорошо, я решил свою проблему с этими настройками. Действительно, действительно спасибо. Я вставлю тебя в благодарность за диссертацию :D
Прежде всего, убедитесь, что вы используете последнюю версию Storm. Если это по-прежнему не работает, опубликуйте свою конфигурацию топологии (подключение топологии, возможно, также pom.xml).