Выключение ActorSystem в потоке akka

У меня есть поток akka, который постоянно потребляет данные из темы kafka. Я никогда не выключаю систему актеров, я не хочу, чтобы мое приложение завершало работу, правильно ли это? Как правильно обрабатывать завершение работы actorySystem?

  implicit val actorSystem: ActorSystem = ActorSystem("mytest")
  implicit val materializer: ActorMaterializer =
    ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))

  val actorConfig = actorSystem.settings.config.getConfig("akka.kafka.consumer")

  val consumerSettings =
    ConsumerSettings(actorConfig, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers(config.getString("kafka.hosts"))
      .withGroupId("mytestgrp")


  val flow = Consumer
    .atMostOnceSource(consumerSettings, Subscriptions.topics(config.getString("kafka.topic")))
    .grouped(500)
    .map(Pipeline.process)
    .withAttributes(supervisionStrategy(decider))

  flow.runWith(Sink.ignore)
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
250
1

Ответы 1

Когда поток завершится, вы можете закрыть систему акторов.

flow.runWith(Sink.ignore).onComplete {
    case _ => actorSystem.shutdown
}

Другие вопросы по теме