Я просматривал документацию по Consumer API для Kafka в Alpakka. Я наткнулся на этот кусок кода. Насколько я понимаю, смещение фиксируется с помощью msg.committableOffset(). Тогда зачем нам нужны .toMat() и mapMaterializedValue(). Могу ли я просто прикрепить его к Sink.Ignore()?
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.toMat(Committer.sink(committerSettings.withMaxBatch(1)), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(materializer);





Вы не можете подключиться к Sink.ignore, потому что вы уже подключили Commiter.Sink. Но вы можете отказаться от материализованных значений.
В примере используется toMat с Keep.both для сохранения обоих материализованных значений: Control из источника и Future[Done] из приемника. С обоими значениями он создает DrainingControl в mapMaterializedValue, который позволяет вам остановить поток или слить поток перед остановкой или получить уведомление об остановке потока.
Если вам не нужен этот элемент управления (хотя вы должны), вы можете использовать
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.to(Committer.sink(committerSettings.withMaxBatch(1)))
.run(materializer);
Спасибо за объяснение. Как вы упомянули, DrainingControl позволяет мне остановить поток или слить его. Если я не реализую это, не будут ли новые сообщения Kafka в теме выбраны потребителем? Я думаю, я пытаюсь выяснить, как используется стокКонтроль.