Я пытаюсь использовать AKKA для потребления из темы kafka и отправки в Kinesis Stream с помощью KPL.
Я создал источник для темы кафки.
ActorSystem actorSystem = ActorSystemFactory.getClassicActorSystem();
KafkaService kafkaService = new kafkaService(actorSystem);
Source<Message, Consumer.Control> kafkaSource = kakfaService.createSource(runtimeConfiguration.getSourceKafkaTopic());
Теперь я хотел бы отправить записи в Kinesis Stream, используя какой-либо приемник. Проблема с «родным» стоком akka kinesis заключается в том, что он не поддерживает KPL. Я пытаюсь использовать эту зависимость
<!-- https://mvnrepository.com/artifact/com.github.j5ik2o/akka-kinesis-kpl -->
<dependency>
<groupId>com.github.j5ik2o</groupId>
<artifactId>akka-kinesis-kpl_2.13</artifactId>
<version>1.0.252</version>
</dependency>
Но только три класса могут иметь отношение к моей цели. KPLFlow, KPLFlowSettings и KPLFlowStage.
Я никогда не использовал потоки в akka, есть ли способ создать сток из потока или, самое большее, настроить поток для отправки в правильный поток кинезиса?
KPLFlowStage
extends GraphStageWithMaterializedValue[FlowShape[UserRecord, UserRecordResult], Future[KinesisProducer]]
Из-за возни с этим похоже, что KPLFlow ожидает UserRecord, это будет означать, что запись уже готова к отправке в поток kinesis.





Нет опыта работы с Kinesis, но вы можете превратить Flow<In, Out, Mat> (например, Flow<UserRecord, UserRecordResult, Future<KinesisProducer>>) в Sink<In, Pair<Out, CompletionStage<Done>> с помощью
Sink<UserRecord, Pair<scala.concurrent.Future<KinesisProducer>, CompletionStage<Done>>> sink =
flow.toMat(Sink.ignore(), Keep.both())
Поскольку KPLFlowStage материализуется как Scala Future и использует библиотеку Scala 2.13, это должно дать вам пару CompletionStage:
import scala.jdk.javaapi.FutureConverters
Sink<UserRecord, Pair<CompletionStage<KinesisProducer>, CompletionStage<Done>>> sink =
flow.mapMaterializedValue(scalaFuture -> FutureConverters.asJava(scalaFuture))
.toMat(Sink.ignore(), Keep.both())
Спасибо. Единственное, чего не хватало, это flow.asJava()...