Как программно перезапустить worker для топологии в Apache Storm

Я столкнулся с проблемой в Apache Storm

Сценарий проблемы:

  1. Когда небольшие данные отправляются в Storm, данные обрабатываются правильно по топологии (имеет только 1 рабочий поток) и отдаются дальше для сохранения в MongoDB.
  2. Но когда данные огромны, он обрабатывает данные и сохраняется в БД, но не принимает впоследствии никаких других данных, независимо от того, большие они или маленькие.

Текущее решение:

перезапускаем worker из Storm UI.

Вопрос:

Можем ли мы перезапустить работника топологии программно?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
548
2

Ответы 2

У Storm есть два типа болтов: IRichBolt и IBasicBolt. Если вы реализуете IBasicBolt, вам также следует реализовать подтверждение. Также вы должны отправить Ack в свой болт, чтобы предотвратить блокировку. Эти ссылки хороши:

http://storm.apache.org/releases/1.0.6/Concepts.htmlhttp://storm.apache.org/releases/1.2.2/Guaranteeing-message-processing.html

Можете ли вы рассказать об этом подробнее? или вы можете дать ссылку, которая это объясняет?

Ankur Soni 13.10.2018 12:52

Перезапуск рабочих процессов никогда не является хорошим решением, вы можете потерять некоторые кортежи. Лучше всего было бы использовать функцию надежности сообщений Storm, как ответил Рахим.

Однако, помимо того, что шторм надежности сообщений имеет внутренний механизм противодавления, это означает, что, когда Spout вводит больше данных, чем то, что Bolts может обработать, Spout автоматически замедляется.

Чтобы включить это, вам нужно сначала, как сказал Рахим, включить подтверждение. Это означает, что если ваша топология проста:

Носик -> Болт

Носик подойдет:

public void nextTuple(){
  ...
  _collector.emit(new Values(tuple), tupleId);
}

@Override
publci void ack(Object msgId) { super.ack(msgId); }

Где tupleId - это просто инкрементный счетчик count++. Таким образом вы объявляете Storm новый кортеж, ожидающий подтверждения.

Между тем в следующем болте и во всех последующих болтах в топологии или, по крайней мере, до того, который вызывает ваше узкое место, вы напишете:

public void execute(Tuple tuple){
  ...
  _collector.emit(tuple, new Values(newTuple));
  _collector.ack(tuple);
}

Таким образом вы заметите, что Storm полностью обработал кортеж.

Наименее, но не в последнюю очередь, в вашем основном методе, где вы объявляете построитель топологии, вы должны определить максимальное количество кортежей, которые будет ожидать Spout:

Config conf = new Config();
conf.setMaxSpoutPending(100);

Таким образом, Spout начнет создавать новые кортежи, ожидая их подтверждения, если (в этом случае) количество ожидающих кортежей превышает 100, spout перестанет вызывать метод nextTuple, ожидая их подтверждения и затем генерируя новые. .

Примечание: значение 100 - это просто пример, вам, возможно, придется немного настроить его, чтобы оптимизировать его для вашей ситуации.

Ссылки, которыми поделился Рахим, должно быть достаточно, чтобы понять механизм, в любом случае, если вы хотите глубже понять реализацию, я добавляю эту ссылку:

Другие вопросы по теме