Я создал этот пример groupBy
-> map
-> mergeSubstreamsWithParallelism
, используя потоки Akka. В курсе, который я делаю, говорится, что groupBy
создаст X подпотоков относительно параметра, который я ему передаю, а затем я должен объединить подпотоки в один поток. Итак, я понимаю, что оператор map
работает параллельно. Это правильно?
Если да, то почему я вижу тот же поток, выполняющий оператор map
в этом коде:
val textSource = Source(List(
"I love Akka streams", // odd
"this has even characters", // even
"this is amazing", // odd
"learning Akka at the Rock the JVM", // odd
"Let's rock the JVM", // even
"123", // odd
"1234" // even
))
val totalCharCountFuture = textSource
.groupBy(2, string => string.length % 2)
.map { c =>
println(s"I am running on thread [${Thread.currentThread().getId}]")
c.length
}// .async // this operator runs in parallel
.mergeSubstreamsWithParallelism(2)
.toMat(Sink.reduce[Int](_ + _))(Keep.right)
.run()
totalCharCountFuture.onComplete {
case Success(value) => println(s"total char count: $value")
case Failure(exception) => println(s"failed computation: $exception")
}
выход:
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
I am running on thread [16]
total char count: 116
затем я добавил .async
, чтобы оператор работал асинхронно. Затем мой вывод показывает разные потоки, выполняющие оператор map
:
I am running on thread [21]
I am running on thread [21]
I am running on thread [21]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]
I am running on thread [20]
Я прочитал документацию в документах Akka об асинхронной границе:
Поместите асинхронную границу вокруг этого потока. Если это подпоток (создается, например, groupBy), это создает асинхронную границу вокруг каждого материализованного подпотока, а не сверхпотока. Таким образом, суперпоток будет взаимодействовать с подпотоками асинхронно.
Итак, мне нужен .async
после groupBy
, чтобы убедиться, что все подпотоки выполняются параллельно или нет? Является ли этот тест, который я выполняю, подтверждением параллелизма оператора в потоке Akka?
Спасибо
Итак, мне нужен .async после groupBy, чтобы убедиться, что все подпотоки выполняются параллельно или нет? Является ли этот тест, который я выполняю, подтверждением параллелизма оператора в потоке Akka?
Короткий ответ: «да», вам нужно async
.
Как правило, в Akka Streams (и других реализациях спецификации реактивных потоков, таких как RxJava или Project Reactor) вам необходимо явно разграничить асинхронные границы. По умолчанию потоки выполняются с одним потоком (или с одним актором в случае Akka Streams). Это включает такие операторы, как groupBy
. Поначалу это может показаться немного нелогичным, но если подумать, параллельное выполнение на самом деле не является обязательным в семантике groupBy
, хотя часто вам нужно параллельное выполнение, потому что это та самая причина, по которой вы применяете groupBy
, будь то использование всех ядер. доступный для какой-либо вычислительной задачи или, возможно, для параллельного вызова какой-либо внешней службы и повышения пропускной способности. В этих случаях вам нужно явно закодировать этот параллелизм. Одним из способов является использование async
, как вы сделали в своем примере, где логика реализации выполнения потока вводит этот параллелизм, или вы также можете использовать mapAsync
, где параллелизм вводится некоторыми средствами, внешними по отношению к логике выполнения потока.