Я столкнулся с проблемой в Apache Storm
Сценарий проблемы:
Текущее решение:
перезапускаем worker из Storm UI.
Вопрос:
Можем ли мы перезапустить работника топологии программно?
У Storm есть два типа болтов: IRichBolt и IBasicBolt. Если вы реализуете IBasicBolt, вам также следует реализовать подтверждение. Также вы должны отправить Ack в свой болт, чтобы предотвратить блокировку. Эти ссылки хороши:
http://storm.apache.org/releases/1.0.6/Concepts.htmlhttp://storm.apache.org/releases/1.2.2/Guaranteeing-message-processing.html
Перезапуск рабочих процессов никогда не является хорошим решением, вы можете потерять некоторые кортежи. Лучше всего было бы использовать функцию надежности сообщений Storm, как ответил Рахим.
Однако, помимо того, что шторм надежности сообщений имеет внутренний механизм противодавления, это означает, что, когда Spout вводит больше данных, чем то, что Bolts может обработать, Spout автоматически замедляется.
Чтобы включить это, вам нужно сначала, как сказал Рахим, включить подтверждение. Это означает, что если ваша топология проста:
Носик -> Болт
Носик подойдет:
public void nextTuple(){
...
_collector.emit(new Values(tuple), tupleId);
}
@Override
publci void ack(Object msgId) { super.ack(msgId); }
Где tupleId - это просто инкрементный счетчик count++
. Таким образом вы объявляете Storm новый кортеж, ожидающий подтверждения.
Между тем в следующем болте и во всех последующих болтах в топологии или, по крайней мере, до того, который вызывает ваше узкое место, вы напишете:
public void execute(Tuple tuple){
...
_collector.emit(tuple, new Values(newTuple));
_collector.ack(tuple);
}
Таким образом вы заметите, что Storm полностью обработал кортеж.
Наименее, но не в последнюю очередь, в вашем основном методе, где вы объявляете построитель топологии, вы должны определить максимальное количество кортежей, которые будет ожидать Spout:
Config conf = new Config();
conf.setMaxSpoutPending(100);
Таким образом, Spout начнет создавать новые кортежи, ожидая их подтверждения, если (в этом случае) количество ожидающих кортежей превышает 100, spout перестанет вызывать метод nextTuple, ожидая их подтверждения и затем генерируя новые. .
Примечание: значение 100 - это просто пример, вам, возможно, придется немного настроить его, чтобы оптимизировать его для вашей ситуации.
Ссылки, которыми поделился Рахим, должно быть достаточно, чтобы понять механизм, в любом случае, если вы хотите глубже понять реализацию, я добавляю эту ссылку:
Можете ли вы рассказать об этом подробнее? или вы можете дать ссылку, которая это объясняет?