Создает ли groupBy в потоке Akka параллельные подпотоки?

Я создал этот пример groupBy -> map -> mergeSubstreamsWithParallelism, используя потоки Akka. В курсе, который я делаю, говорится, что groupBy создаст X подпотоков относительно параметра, который я ему передаю, а затем я должен объединить подпотоки в один поток. Итак, я понимаю, что оператор map работает параллельно. Это правильно?

Если да, то почему я вижу тот же поток, выполняющий оператор map в этом коде:

val textSource = Source(List(
  "I love Akka streams", // odd
  "this has even characters", // even
  "this is amazing", // odd
  "learning Akka at the Rock the JVM", // odd
  "Let's rock the JVM", // even
  "123", // odd
  "1234" // even
))
val totalCharCountFuture = textSource
  .groupBy(2, string => string.length % 2)
  .map { c =>
    println(s"I am running on thread [${Thread.currentThread().getId}]")
    c.length
  }// .async // this operator runs in parallel
  .mergeSubstreamsWithParallelism(2)
  .toMat(Sink.reduce[Int](_ + _))(Keep.right)
  .run()
totalCharCountFuture.onComplete {
  case Success(value) => println(s"total char count: $value")
  case Failure(exception) => println(s"failed computation: $exception")
}

выход:

I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
total char count: 116

затем я добавил .async, чтобы оператор работал асинхронно. Затем мой вывод показывает разные потоки, выполняющие оператор map:

I am running on thread [21]
I am running on thread [21]
I am running on thread [21]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]

Я прочитал документацию в документах Akka об асинхронной границе:

Поместите асинхронную границу вокруг этого потока. Если это подпоток (создается, например, groupBy), это создает асинхронную границу вокруг каждого материализованного подпотока, а не сверхпотока. Таким образом, суперпоток будет взаимодействовать с подпотоками асинхронно.

Итак, мне нужен .async после groupBy, чтобы убедиться, что все подпотоки выполняются параллельно или нет? Является ли этот тест, который я выполняю, подтверждением параллелизма оператора в потоке Akka?

Спасибо

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
781
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Итак, мне нужен .async после groupBy, чтобы убедиться, что все подпотоки выполняются параллельно или нет? Является ли этот тест, который я выполняю, подтверждением параллелизма оператора в потоке Akka?

Короткий ответ: «да», вам нужно async.

Как правило, в Akka Streams (и других реализациях спецификации реактивных потоков, таких как RxJava или Project Reactor) вам необходимо явно разграничить асинхронные границы. По умолчанию потоки выполняются с одним потоком (или с одним актором в случае Akka Streams). Это включает такие операторы, как groupBy. Поначалу это может показаться немного нелогичным, но если подумать, параллельное выполнение на самом деле не является обязательным в семантике groupBy, хотя часто вам нужно параллельное выполнение, потому что это та самая причина, по которой вы применяете groupBy, будь то использование всех ядер. доступный для какой-либо вычислительной задачи или, возможно, для параллельного вызова какой-либо внешней службы и повышения пропускной способности. В этих случаях вам нужно явно закодировать этот параллелизм. Одним из способов является использование async, как вы сделали в своем примере, где логика реализации выполнения потока вводит этот параллелизм, или вы также можете использовать mapAsync, где параллелизм вводится некоторыми средствами, внешними по отношению к логике выполнения потока.

Другие вопросы по теме