Я получаю Flow<A, B> (который представляет собой причудливую штуку с потоком / графиком, см. https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Flow.html) из какого-то внешнего кода, неподконтрольного мне. Мне нужно обернуть этот поток и выполнить некоторую обработку для каждого элемента ввода и каждого элемента вывода. Я легко могу добиться этого, поместив поверх него BidiFlow следующим образом:
Flow<I, O, Unused> flow = ...; // external source
BidiFlow<I, I, O, O, Unused> bidi = BidiFlow.fromFunctions(i -> preprocess(i), o -> postprocess(o)); // do something on every input and every output
Flow<I, O, Unused> newFlow = bidi.join(flow);
Итак, вот поворот: чтобы правильно постобработать выходной элемент o, мне нужен вход, который сгенерировал этот выходной элемент. Поскольку у меня нет контроля над базовым потоком, я не могу выполнить его рефакторинг, чтобы вернуть, например, кортеж ввода и вывода. И из-за асинхронной и параллельной природы Akka я не могу делать никаких трюков, таких как сохранение ввода в локальном потоке или статическом поле или что-то подобное.
Итак, мой вопрос: есть ли какая-то магия Akka Streams, которую я могу применить, чтобы каким-то образом получить элемент ввода, который сгенерировал вывод?





Вы можете использовать Graph Api. Вы можете транслировать введенные вами данные в два потока: один для выполнения вашего процесса, а другой для обхода вашего идентификатора. Последняя задача должна состоять из этих потоков. Взгляните на Akka Streams / HTTP: получить исходный запрос из ответа. Может, это поможет.
не могли бы вы подробнее рассказать, как это применимо? Я действительно не понимаю ваш образец, образец работает с источниками (которые на самом деле не являются теми же типами, что и в моем случае, и я не знаю, как преобразовать их друг в друга) и использует операторы, которых я нигде не нашел в документации.
Транслируемые элементы будут в порядке после этапа zip, если вы не думаете, как фильтровать или включать асинхронные операции ввода-вывода или параллелизм в один из них, который может изменить порядок. В примере используется Graph API doc.akka.io/docs/akka/2.5/stream/stream-graphs.html
Вы правильно прочитали вопрос? Вы продолжаете ссылаться на примеры, в которых отсутствуют жизненно важные части. Как указано в моем исходном вопросе, мне нужно обернуть существующий поток, то есть у меня нет источника и приемника, с которым я могу работать. Кроме того, он может быть асинхронным, именно об этом и идет речь в исходном вопросе. И я знаю, что понимаю ваш комментарий как «он не работает с асинхронными потоками»
Это решение использования этапов GraphDsl, Broadcast и Zip.
val externalFlow: Flow[Int, String, NotUsed] = Flow[Int].map(i => i.toString + "-external")
def zipInAndOut[I, O](flow: Flow[I, O, NotUsed]): Flow[I, (I, O), NotUsed] = {
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[I](2))
val zip = b.add(Zip[I, O])
val theFlow = b.add(flow)
broadcast.out(0) ~> zip.in0
broadcast.out(1) ~> theFlow ~> zip.in1
new FlowShape(broadcast.in, zip.out)
})
}
Source
.fromIterator(() => (1 until 10).iterator)
.via(zipInAndOut(externalFlow))
.runWith(Sink.foreach(println))
отпечатки
(1,1-external)
(2,2-external)
(3,3-external)
(4,4-external)
(5,5-external)
(6,6-external)
(7,7-external)
(8,8-external)
(9,9-external)
Хорошая идея. В настоящее время я изучаю это, и я пытаюсь понять код в комментарии к вашей ссылке. Однако я не совсем уверен, правильно ли это сжатие учитывает параллелизм? Я думаю, когда элементы транслируются, как можно связать правильные друг с другом, когда я снова их заархивирую?