Почему этот код Scala выполняет два фьючерса в одном потоке?

Я давно использую несколько потоков, но не могу объяснить такой простой случай.

import java.util.concurrent.Executors
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))

def addOne(x: Int) = Future(x + 1)
def addTwo(x: Int) = Future {addOne(x + 1)}

addTwo(1)
// res5: Future[Future[Int]] = Future(Success(Future(Success(3))))

К моему удивлению, это работает. И я не знаю, почему.

Вопрос:
Почему данный поток один может выполнять два фьючерса одновременно?

Мое ожидание:
Первый Future (addTwo) занимает один-единственный поток (newFixedThreadPool(1)), затем он вызывает другой Future (addOne), которому снова нужен другой поток.
Таким образом, программа должна в конечном итоге лишиться потоков и застрять.

Существует только один поток, но есть и неограниченная очередь, которая примет все фьючерсы, над которыми в данный момент нельзя работать. Если ни один из ваших фьючерсов не заблокируется, они в конечном итоге завершатся, и будет обработан следующий из очереди. Но да, если вы заблокируете этот поток, все застрянет. def addThree(x) = Future(Await.result(addOne(x))) сделал бы это.

Thilo 01.07.2019 07:58
Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
10
1
803
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

расширить его до Promise легко понять

val p1 = Promise[Future[Int]]
ec.execute(() => {
  // the fist task is start run
  val p2 = Promise[Int]
  //the second task is submit , but no run
  ec.execute(() => {
    p2.complete(Success(1))
    println(s"task 2 -> p1:${p1},p2:${p2}")
  })
  //here the p1 is completed, not wait p2.future finish
  p1.complete(Success(p2.future))
  println(s"task 1 -> p1:${p1},p2:${p2}")// you can see the p1 is completed but the p2 have not
  //first task is finish, will run second task
})
val result: Future[Future[Int]] = p1.future

Thread.sleep(1000)
println(result)
Ответ принят как подходящий

Причина того, что ваш код работает, заключается в том, что оба фьючерса будут выполняться одним и тем же потоком. Создаваемый вами ExecutionContext не будет использовать Thread напрямую для каждого Future, а вместо этого будет планировать выполнение задач (экземпляров Runnable). Если в пуле больше нет доступных потоков, эти задачи будут помещены в BlockingQueue, ожидающие выполнения. (Подробности см. в API ThreadPoolExecutor)

Если вы посмотрите на реализацию Executors.newFixedThreadPool(1), вы увидите, что она создает Executor с неограниченной очередью:

new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])

Чтобы получить эффект голодания потока, который вы искали, вы можете сами создать исполнителя с ограниченной очередью:

 implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L, 
                     TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))

Поскольку минимальная емкость ArrayBlockingQueue равна 1, вам потребуется три фьючерса, чтобы достичь предела, и вам также нужно будет добавить некоторый код, который будет выполняться в результате фьючерса, чтобы предотвратить их завершение (в приведенном ниже примере я делаю это добавив .map(identity))

Следующий пример

import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L, 
                      TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))

def addOne(x: Int) = Future {
  x + 1
}
def addTwo(x: Int) = Future {
  addOne(x + 1) .map(identity)
}
def addThree(x: Int) = Future {
  addTwo(x + 1).map(identity)
}

println(addThree(1))

терпит неудачу с

java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@65a264b6 rejected from java.util.concurrent.ThreadPoolExecutor@10d078f4[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 1]

Другие вопросы по теме