Я знаю, что могу преобразовать Seq[Future[T]] в Future[Seq[T]] через
val seqFuture = Future.sequence(seqOfFutures)
seqFuture.map((seqT: Seq[T]) => {...})
Моя проблема сейчас в том, что у меня есть 700 фьючерсов в этой последовательности, и я хочу иметь возможность контролировать, сколько из них решается параллельно, поскольку каждое будущее будет вызывать внутренний api отдыха, а одновременное выполнение 700 запросов похоже на запуск дос-атака на этот сервер.
Я предпочитаю решать только что-то вроде 10 фьючерсов за раз.
Как я могу этого добиться?
Пробуя ответ Паму, я вижу ошибку:
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:44: com.dreamlines.commons.LazyFuture[A] does not take parameters
[error] val batch = Future.sequence(c.map(_()))
[error] ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:28: no type parameters for method sequence: (in: M[scala.concurrent.Future[A]])(implicit cbf: scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]], implicit executor: scala.concurrent.ExecutionContext)scala.concurrent.Future[M[A]] exist so that it can be applied to arguments (List[Nothing])
[error] --- because ---
[error] argument expression's type is not compatible with formal parameter type;
[error] found : List[Nothing]
[error] required: ?M[scala.concurrent.Future[?A]]
[error] val batch = Future.sequence(c.map(_()))
[error] ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:42: type mismatch;
[error] found : List[Nothing]
[error] required: M[scala.concurrent.Future[A]]
[error] val batch = Future.sequence(c.map(_()))
[error] ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:36: Cannot construct a collection of type M[A] with elements of type A based on a collection of type M[scala.concurrent.Future[A]].
[error] val batch = Future.sequence(c.map(_()))
[error] ^
[error] four errors found





Простой foldLeft можно использовать для управления количеством фьючерсов, которые выполняются одновременно.
Во-первых, давайте создадим класс case под названием LazyFuture
case class LazyFuture[+A](f: Unit => Future[A]) {
def apply() = f()
}
object LazyFuture {
def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
}
LazyFuture немедленно останавливает запуск будущего
val list: List[LazyFuture[A]] = ...
list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
val batch = Future.sequence(c.map(_()))
batch.flatMap(values => r.map(rs => rs ++ values))
}
Измените concurFactor соответственно, чтобы запускать несколько фьючерсов одновременно.
concurFactor of 1 запустит одно будущее сразу
concurFactor of 2 будет запускать сразу два фьючерса
и так далее ...
def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int) =
list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
val batch = Future.sequence(c.map(_()))
r.flatMap(rs => batch.map(values => rs ++ values))
}
case class LazyFuture[+A](f: Unit => Future[A]) {
def apply() = f()
}
object LazyFuture {
def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
}
def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)(implicit ec: ExecutionContext): Future[List[A]] =
list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
val batch = Future.sequence(c.map(_ ()))
r.flatMap(rs => batch.map(values => rs ++ values))
}
Вы также можете ограничить вычислительные ресурсы, ограничив количество потоков в пуле выполнения. Но это решение не такое уж гибкое. Лично мне это не нравится.
val context: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
Вы должны не забыть передать правильный контекст выполнения, который является неявным значением. Иногда мы не знаем, какое неявное значение входит в область действия. Это глючит
Когда будущее построено, как показано ниже
val foo = Future {
1 + 2
} // future starts executing
LazyFuture(foo) // Not a right way
foo уже запущен, и его нельзя контролировать.
Правильный способ построения LazyFuture
val foo = LazyFuture {
1 + 2
}
или же
val foo = LazyFuture {
Future {
1 + 2
}
}
package main
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration
object Main {
case class LazyFuture[A](f: Unit => Future[A]) {
def apply(): Future[A] = f()
}
object LazyFuture {
def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
def apply[A](f: => Future[A]): LazyFuture[A] = LazyFuture(_ => f)
}
def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)
(implicit ec: ExecutionContext): Future[List[A]] =
list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
val batch = Future.sequence(c.map(_ ()))
r.flatMap(rs => r.map(values=> rs ++ values))
}
def main(args: Array[String]): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
val futures: Seq[LazyFuture[Int]] = List(1, 2, 3, 4, 5).map { value =>
LazyFuture {
println(s"value: $value started")
Thread.sleep(value * 200)
println(s"value: $value stopped")
value
}
}
val f = executeBatch(futures.toList)(2)
Await.result(f, Duration.Inf)
}
}
Что вы имеете в виду, говоря о том, что ограничение количества потоков пула выполнения не является гибким? Что я потеряю, пойдя в этом направлении?
@ k0pernikus Вы должны помнить о передаче правильного контекста выполнения, который является неявным значением. Иногда мы не знаем, какое неявное значение входит в область действия. Его багги
А как преобразовать List[Future[T]] в List[LazyFuture[T]]?
@ k0pernikus добавил конструктор. пожалуйста, посмотрите. Предупреждение: будущее должно быть неоцененным.
@ k0pernikus лучше использовать конструктор LazyFuture для построения вычисления
Просмотрите мой обновленный вопрос с трассировкой стека при использовании этого подхода, поскольку я вижу: Ошибка expression's type is not compatible with formal parameter type
Может быть, моя ошибка связана с тем, что я пытаюсь обернуть будущее внутри LazyFuture, тогда как я должен для начала создать LazyFuture?
@ k0pernikus Исправлены ошибки компиляции. ошибка была при объявлении LazyFuture
@pamu ваше решение не запускает точно фьючерсы n в данный момент. Когда вы начинаете обрабатывать группу, она полностью использует пул. Но после того, как несколько задач готовы, он не берет задачи из следующей группы. И последнее задание в группе будет выполняться в одиночку.
@simpadjo Ура! это зависит от сгруппированных. количество задач, разделенное на коэффициент совпадения, составляет сделку. если напоминание равно 0, то все в порядке, если не некоторые задачи, наконец, останутся запущенными. Но количество фьючерсов, работающих в данный момент времени, ограничено.
@pamu нет, проблема существует для любого размера группы, поскольку задачи не завершаются одновременно. У меня была такая же проблема, и я не нашел красивого и простого неблокирующего решения.
Это ведет себя не так, как ожидалось. Похоже, что на самом деле он не ждет завершения 10 фьючерсов, и после 70 LazyFutre он просто останавливается.
Параллелизм в Scala Futures управляется ExecutionContext. Обратите внимание, что фьючерсы начинают выполняться в контексте сразу после создания, поэтому ExecutionContext из Future.sequence не имеет особого значения. Вы должны предоставить соответствующий контекст при создании исходных фьючерсов из последовательности.
Контекст по умолчанию ExecutionContext.global (обычно импортируемый через import scala.concurrent.ExecutionContext.Implicits.global) использует столько потоков, сколько имеется ядер процессора, но он также может создавать множество дополнительных потоков для блокирующих задач, которые заключены в scala.concurrent.blocking. Обычно это желаемое поведение, но оно не подходит для вашей проблемы.
К счастью, вы можете использовать метод ExecutionContext.fromExecutor для обертывания пула потоков Java. Например:
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val context = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() }(context))
val sequenceFuture = Future.sequence(seqOfFutures)(ExecutionContext.global)
Конечно, контекст также может быть предоставлен неявно:
implicit val context: ExecutionContext =
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() })
// This `sequence` uses the same thread pool as the original futures
val sequenceFuture = Future.sequence(seqOfFutures)
Посмотрите, как можно регулировать частоту запросов с помощью асинхронного HTTP-клиента (Play WS). То же самое можно применить к Akka Http Client: stackoverflow.com/questions/37259206/…