Я давно использую несколько потоков, но не могу объяснить такой простой случай.
import java.util.concurrent.Executors
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
def addOne(x: Int) = Future(x + 1)
def addTwo(x: Int) = Future {addOne(x + 1)}
addTwo(1)
// res5: Future[Future[Int]] = Future(Success(Future(Success(3))))
К моему удивлению, это работает. И я не знаю, почему.
Вопрос:
Почему данный поток один может выполнять два фьючерса одновременно?
Мое ожидание:
Первый Future (addTwo) занимает один-единственный поток (newFixedThreadPool(1)), затем он вызывает другой Future (addOne), которому снова нужен другой поток.
Таким образом, программа должна в конечном итоге лишиться потоков и застрять.





расширить его до Promise легко понять
val p1 = Promise[Future[Int]]
ec.execute(() => {
// the fist task is start run
val p2 = Promise[Int]
//the second task is submit , but no run
ec.execute(() => {
p2.complete(Success(1))
println(s"task 2 -> p1:${p1},p2:${p2}")
})
//here the p1 is completed, not wait p2.future finish
p1.complete(Success(p2.future))
println(s"task 1 -> p1:${p1},p2:${p2}")// you can see the p1 is completed but the p2 have not
//first task is finish, will run second task
})
val result: Future[Future[Int]] = p1.future
Thread.sleep(1000)
println(result)
Причина того, что ваш код работает, заключается в том, что оба фьючерса будут выполняться одним и тем же потоком. Создаваемый вами ExecutionContext не будет использовать Thread напрямую для каждого Future, а вместо этого будет планировать выполнение задач (экземпляров Runnable). Если в пуле больше нет доступных потоков, эти задачи будут помещены в BlockingQueue, ожидающие выполнения. (Подробности см. в API ThreadPoolExecutor)
Если вы посмотрите на реализацию Executors.newFixedThreadPool(1), вы увидите, что она создает Executor с неограниченной очередью:
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])
Чтобы получить эффект голодания потока, который вы искали, вы можете сами создать исполнителя с ограниченной очередью:
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))
Поскольку минимальная емкость ArrayBlockingQueue равна 1, вам потребуется три фьючерса, чтобы достичь предела, и вам также нужно будет добавить некоторый код, который будет выполняться в результате фьючерса, чтобы предотвратить их завершение (в приведенном ниже примере я делаю это добавив .map(identity))
Следующий пример
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))
def addOne(x: Int) = Future {
x + 1
}
def addTwo(x: Int) = Future {
addOne(x + 1) .map(identity)
}
def addThree(x: Int) = Future {
addTwo(x + 1).map(identity)
}
println(addThree(1))
терпит неудачу с
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@65a264b6 rejected from java.util.concurrent.ThreadPoolExecutor@10d078f4[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 1]
Существует только один поток, но есть и неограниченная очередь, которая примет все фьючерсы, над которыми в данный момент нельзя работать. Если ни один из ваших фьючерсов не заблокируется, они в конечном итоге завершатся, и будет обработан следующий из очереди. Но да, если вы заблокируете этот поток, все застрянет.
def addThree(x) = Future(Await.result(addOne(x)))сделал бы это.