Как прозрачно связать элемент ввода с элементом вывода

Я получаю Flow<A, B> (который представляет собой причудливую штуку с потоком / графиком, см. https://doc.akka.io/api/akka/current/akka/stream/scaladsl/Flow.html) из какого-то внешнего кода, неподконтрольного мне. Мне нужно обернуть этот поток и выполнить некоторую обработку для каждого элемента ввода и каждого элемента вывода. Я легко могу добиться этого, поместив поверх него BidiFlow следующим образом:

Flow<I, O, Unused> flow = ...; // external source
BidiFlow<I, I, O, O, Unused> bidi = BidiFlow.fromFunctions(i -> preprocess(i), o -> postprocess(o)); // do something on every input and every output
Flow<I, O, Unused> newFlow = bidi.join(flow);

Итак, вот поворот: чтобы правильно постобработать выходной элемент o, мне нужен вход, который сгенерировал этот выходной элемент. Поскольку у меня нет контроля над базовым потоком, я не могу выполнить его рефакторинг, чтобы вернуть, например, кортеж ввода и вывода. И из-за асинхронной и параллельной природы Akka я не могу делать никаких трюков, таких как сохранение ввода в локальном потоке или статическом поле или что-то подобное.

Итак, мой вопрос: есть ли какая-то магия Akka Streams, которую я могу применить, чтобы каким-то образом получить элемент ввода, который сгенерировал вывод?

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
5
0
152
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы можете использовать Graph Api. Вы можете транслировать введенные вами данные в два потока: один для выполнения вашего процесса, а другой для обхода вашего идентификатора. Последняя задача должна состоять из этих потоков. Взгляните на Akka Streams / HTTP: получить исходный запрос из ответа. Может, это поможет.

Хорошая идея. В настоящее время я изучаю это, и я пытаюсь понять код в комментарии к вашей ссылке. Однако я не совсем уверен, правильно ли это сжатие учитывает параллелизм? Я думаю, когда элементы транслируются, как можно связать правильные друг с другом, когда я снова их заархивирую?

loonytune 29.11.2018 13:56

не могли бы вы подробнее рассказать, как это применимо? Я действительно не понимаю ваш образец, образец работает с источниками (которые на самом деле не являются теми же типами, что и в моем случае, и я не знаю, как преобразовать их друг в друга) и использует операторы, которых я нигде не нашел в документации.

loonytune 29.11.2018 14:35

Транслируемые элементы будут в порядке после этапа zip, если вы не думаете, как фильтровать или включать асинхронные операции ввода-вывода или параллелизм в один из них, который может изменить порядок. В примере используется Graph API doc.akka.io/docs/akka/2.5/stream/stream-graphs.html

Emiliano Martinez 29.11.2018 17:49

Вы правильно прочитали вопрос? Вы продолжаете ссылаться на примеры, в которых отсутствуют жизненно важные части. Как указано в моем исходном вопросе, мне нужно обернуть существующий поток, то есть у меня нет источника и приемника, с которым я могу работать. Кроме того, он может быть асинхронным, именно об этом и идет речь в исходном вопросе. И я знаю, что понимаю ваш комментарий как «он не работает с асинхронными потоками»

loonytune 30.11.2018 12:05
Ответ принят как подходящий

Это решение использования этапов GraphDsl, Broadcast и Zip.

  val externalFlow: Flow[Int, String, NotUsed] = Flow[Int].map(i => i.toString + "-external")

  def zipInAndOut[I, O](flow: Flow[I, O, NotUsed]): Flow[I, (I, O), NotUsed] = {
    Flow.fromGraph(GraphDSL.create() { implicit b =>
      import GraphDSL.Implicits._
      val broadcast = b.add(Broadcast[I](2))
      val zip = b.add(Zip[I, O])
      val theFlow = b.add(flow)
      broadcast.out(0) ~> zip.in0
      broadcast.out(1) ~> theFlow ~> zip.in1
      new FlowShape(broadcast.in, zip.out)
    })
  }
  Source
    .fromIterator(() => (1 until 10).iterator)
    .via(zipInAndOut(externalFlow))
    .runWith(Sink.foreach(println))

отпечатки

(1,1-external)
(2,2-external)
(3,3-external)
(4,4-external)
(5,5-external)
(6,6-external)
(7,7-external)
(8,8-external)
(9,9-external)

Другие вопросы по теме