Я пытаюсь написать код как часть более крупного интеграционного теста.
У меня есть функция, которая создает KinesisSource
типа Source[KinesisRecord, Future[Done]
:
protected def createKinesisSource(config: KinesisClientLibConfiguration): Source[KinesisRecord, Future[Done]]#Repr[MyFile.KinesisFlow] = {
KinesisSource(config).mapAsync(1)(processRecord)
}
protected def processRecord(record: KinesisRecord): Future[KinesisFlow] = {
Future { validateRecord(record) }
}
Я хочу создать поддельный источник того же типа из файла. У меня есть (простите за код, я новичок в scala):
val records = scala.io.Source.fromFile("testFile.txt").getLines.toList
.map(i => {
new KinesisRecord( ByteString.fromString(i), "x", None, "x", None, Instant.now(), "x")
})
Source(records).mapAsync(1)(processRecord)
Это дает мне Source[KinesisRecord, NotUsed]
. Как я могу изменить это на Source[KinesisRecord, Future[Done]]
? Я понимаю, что второй параметр — это материализованное значение (увидеть этот пост), но я не уверен, как указать это значение без фактического применения функции run
.
Вы можете использовать mapMaterializedValue(_ => Future.successful(Done))
или Future.never
Но если материализованное значение повлияет на ваши интеграционные тесты, это повлияет и на них, потому что Будущее завершится сразу после материализации потока или никогда.