Я реализую метод reduce
в Scala как упражнение по функциональному программированию в Scala, и не думаю, что он выполняется параллельно. Как создать его полностью параллельную реализацию?
def reduce[A](pas: IndexedSeq[A])(f: (A, A) => A): Par[A] =
if pas.isEmpty then throw new Exception("Can't reduce empty list")
else if pas.size == 1 then unit(pas.head)
else
val (l, r) = pas.splitAt(pas.size / 2)
reduce(l)(f).map2(reduce(r)(f))(f)
Я использую класс Par из книги:
object Par:
opaque type Par[A] = ExecutorService => Future[A]
extension [A](pa: Par[A]) def run(s: ExecutorService): Future[A] = pa(s)
def fork[A](a: => Par[A]): Par[A] =
es => es.submit(new Callable[A] { def call = a(es).get })
def lazyUnit[A](a: => A): Par[A] = fork(unit(a))
Что ж, все ваши вычисления завершаются без fork
, поэтому все работает линейно, ничего не работает параллельно. Ты пробовал
fork(reduce(l)(f)).map2(fork(reduce(r)(f)))(f)
или
else fork {
val (l, r) = pas.splitAt(pas.size / 2)
reduce(l)(f).map2(reduce(r)(f))(f)
}
?
Вы уверены, что это был тупик, а не просто бесконечная рекурсия, которая загромождала ваш пул потоков бесконечным количеством фьючерсов?
Я думаю, это может быть связано с тем, как я вызывал метод val es = Executors.newFixedThreadPool(10)
Par.reduce(1 to 1000)(_ + _).run(es)
Я изменил службу исполнителя на CachedThreadPool()
, и это сработало.
Большое спасибо за ваш ответ, это сработало. Я пробовал это, но у меня возникла проблема с вызовом функции с небольшим количеством потоков, что привело к тупику.