Как контролировать параллелизм future.sequence в scala?

Я знаю, что могу преобразовать Seq[Future[T]] в Future[Seq[T]] через

  val seqFuture = Future.sequence(seqOfFutures)
  seqFuture.map((seqT: Seq[T]) => {...})

Моя проблема сейчас в том, что у меня есть 700 фьючерсов в этой последовательности, и я хочу иметь возможность контролировать, сколько из них решается параллельно, поскольку каждое будущее будет вызывать внутренний api отдыха, а одновременное выполнение 700 запросов похоже на запуск дос-атака на этот сервер.

Я предпочитаю решать только что-то вроде 10 фьючерсов за раз.

Как я могу этого добиться?


Пробуя ответ Паму, я вижу ошибку:

[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:44: com.dreamlines.commons.LazyFuture[A] does not take parameters
[error]         val batch = Future.sequence(c.map(_()))
[error]                                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:28: no type parameters for method sequence: (in: M[scala.concurrent.Future[A]])(implicit cbf: scala.collection.generic.CanBuildFrom[M[scala.concurrent.Future[A]],A,M[A]], implicit executor: scala.concurrent.ExecutionContext)scala.concurrent.Future[M[A]] exist so that it can be applied to arguments (List[Nothing])
[error]  --- because ---
[error] argument expression's type is not compatible with formal parameter type;
[error]  found   : List[Nothing]
[error]  required: ?M[scala.concurrent.Future[?A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                            ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:42: type mismatch;
[error]  found   : List[Nothing]
[error]  required: M[scala.concurrent.Future[A]]
[error]         val batch = Future.sequence(c.map(_()))
[error]                                          ^
[error] /home/philipp/src/bluebat/src/main/scala/com/dreamlines/metronome/service/JobFetcher.scala:32:36: Cannot construct a collection of type M[A] with elements of type A based on a collection of type M[scala.concurrent.Future[A]].
[error]         val batch = Future.sequence(c.map(_()))
[error]                                    ^
[error] four errors found

Посмотрите, как можно регулировать частоту запросов с помощью асинхронного HTTP-клиента (Play WS). То же самое можно применить к Akka Http Client: stackoverflow.com/questions/37259206/…

Artavazd Balayan 20.04.2018 05:30
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
12
1
2 346
2

Ответы 2

Сложить влево

Простой foldLeft можно использовать для управления количеством фьючерсов, которые выполняются одновременно.

Во-первых, давайте создадим класс case под названием LazyFuture

case class LazyFuture[+A](f: Unit => Future[A]) {
  def apply() = f()
}

object LazyFuture {
  def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

  def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
}

LazyFuture немедленно останавливает запуск будущего

val list: List[LazyFuture[A]] = ...


list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
  val batch = Future.sequence(c.map(_()))
  batch.flatMap(values => r.map(rs => rs ++ values))
}

Измените concurFactor соответственно, чтобы запускать несколько фьючерсов одновременно.

concurFactor of 1 запустит одно будущее сразу

concurFactor of 2 будет запускать сразу два фьючерса

и так далее ...

def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int) =
   list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])){ (r, c) =>
      val batch = Future.sequence(c.map(_()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

Полный код

  case class LazyFuture[+A](f: Unit => Future[A]) {
    def apply() = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))

    def apply[A](f: => Future[A])(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)(implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => batch.map(values => rs ++ values))
    }

Ограничение контекста выполнения

Вы также можете ограничить вычислительные ресурсы, ограничив количество потоков в пуле выполнения. Но это решение не такое уж гибкое. Лично мне это не нравится.

val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))

Вы должны не забыть передать правильный контекст выполнения, который является неявным значением. Иногда мы не знаем, какое неявное значение входит в область действия. Это глючит

Предупреждение

Когда будущее построено, как показано ниже

val foo = Future {
     1 + 2
} // future starts executing

LazyFuture(foo) // Not a right way

foo уже запущен, и его нельзя контролировать.

Правильный способ построения LazyFuture

val foo = LazyFuture {
  1 + 2
}

или же

val foo = LazyFuture {
  Future {
   1 + 2
  }
}

Рабочий пример

package main

import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration

object Main {

  case class LazyFuture[A](f: Unit => Future[A]) {
    def apply(): Future[A] = f()
  }

  object LazyFuture {
    def apply[A](f: => A)(implicit ec: ExecutionContext): LazyFuture[A] = LazyFuture(_ => Future(f))
    def apply[A](f: => Future[A]): LazyFuture[A] = LazyFuture(_ => f)
  }

  def executeBatch[A](list: List[LazyFuture[A]])(concurFactor: Int)
    (implicit ec: ExecutionContext): Future[List[A]] =
    list.grouped(concurFactor).foldLeft(Future.successful(List.empty[A])) { (r, c) =>
      val batch = Future.sequence(c.map(_ ()))
      r.flatMap(rs => r.map(values=> rs ++ values))
    }

  def main(args: Array[String]): Unit = {
    import scala.concurrent.ExecutionContext.Implicits.global


    val futures: Seq[LazyFuture[Int]] = List(1, 2, 3, 4, 5).map { value =>
      LazyFuture {
        println(s"value: $value started")
        Thread.sleep(value * 200)
        println(s"value: $value stopped")
        value
      }
    }
    val f = executeBatch(futures.toList)(2)
    Await.result(f, Duration.Inf)
  }

}

Что вы имеете в виду, говоря о том, что ограничение количества потоков пула выполнения не является гибким? Что я потеряю, пойдя в этом направлении?

k0pernikus 19.04.2018 18:02

@ k0pernikus Вы должны помнить о передаче правильного контекста выполнения, который является неявным значением. Иногда мы не знаем, какое неявное значение входит в область действия. Его багги

pamu 19.04.2018 18:04

А как преобразовать List[Future[T]] в List[LazyFuture[T]]?

k0pernikus 19.04.2018 18:19

@ k0pernikus добавил конструктор. пожалуйста, посмотрите. Предупреждение: будущее должно быть неоцененным.

pamu 19.04.2018 18:26

@ k0pernikus лучше использовать конструктор LazyFuture для построения вычисления

pamu 19.04.2018 18:27

Просмотрите мой обновленный вопрос с трассировкой стека при использовании этого подхода, поскольку я вижу: Ошибка expression's type is not compatible with formal parameter type

k0pernikus 19.04.2018 18:48

Может быть, моя ошибка связана с тем, что я пытаюсь обернуть будущее внутри LazyFuture, тогда как я должен для начала создать LazyFuture?

k0pernikus 19.04.2018 18:51

@ k0pernikus Исправлены ошибки компиляции. ошибка была при объявлении LazyFuture

pamu 19.04.2018 19:24

@pamu ваше решение не запускает точно фьючерсы n в данный момент. Когда вы начинаете обрабатывать группу, она полностью использует пул. Но после того, как несколько задач готовы, он не берет задачи из следующей группы. И последнее задание в группе будет выполняться в одиночку.

simpadjo 20.04.2018 09:45

@simpadjo Ура! это зависит от сгруппированных. количество задач, разделенное на коэффициент совпадения, составляет сделку. если напоминание равно 0, то все в порядке, если не некоторые задачи, наконец, останутся запущенными. Но количество фьючерсов, работающих в данный момент времени, ограничено.

pamu 20.04.2018 10:54

@pamu нет, проблема существует для любого размера группы, поскольку задачи не завершаются одновременно. У меня была такая же проблема, и я не нашел красивого и простого неблокирующего решения.

simpadjo 20.04.2018 11:56

Это ведет себя не так, как ожидалось. Похоже, что на самом деле он не ждет завершения 10 фьючерсов, и после 70 LazyFutre он просто останавливается.

k0pernikus 23.04.2018 12:12

Параллелизм в Scala Futures управляется ExecutionContext. Обратите внимание, что фьючерсы начинают выполняться в контексте сразу после создания, поэтому ExecutionContext из Future.sequence не имеет особого значения. Вы должны предоставить соответствующий контекст при создании исходных фьючерсов из последовательности.

Контекст по умолчанию ExecutionContext.global (обычно импортируемый через import scala.concurrent.ExecutionContext.Implicits.global) использует столько потоков, сколько имеется ядер процессора, но он также может создавать множество дополнительных потоков для блокирующих задач, которые заключены в scala.concurrent.blocking. Обычно это желаемое поведение, но оно не подходит для вашей проблемы.

К счастью, вы можете использовать метод ExecutionContext.fromExecutor для обертывания пула потоков Java. Например:

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext

val context = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() }(context))
val sequenceFuture = Future.sequence(seqOfFutures)(ExecutionContext.global)

Конечно, контекст также может быть предоставлен неявно:

implicit val context: ExecutionContext = 
  ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
val seqOfFutures = Seq.fill(700)(Future { callRestApi() })
// This `sequence` uses the same thread pool as the original futures
val sequenceFuture = Future.sequence(seqOfFutures) 

Другие вопросы по теме