Запуск future.map внутри future с помощью одного потока forkjoinpool

У меня был код, в котором использовалась сборка ExecutionContext (EC) с akka (ActorSystem). Этот код делает что-то весьма своеобразное: он использует AkkaForkJoinPool с parallelism-max = 1 и выполняет что-то вроде:

implicit ec = // akka EC backed by AkkaForkJoinPool with parallelism=1

Future{ // (1)
  // (2) get data from DB which uses a separate ExecutionContext for IO
  val data: Future[Data] = getData()

  // (3) use the data
  data.map{ whatEver }

  // etc ...
}

[Edit: Я знаю, что это странно - иметь верхнее будущее (1). Но на самом деле это не мой код, он охватывает несколько функций и использует более сложные операции, такие как несколько упакованных для понимания. Так что я не буду это менять]

Теперь я переместил этот код и заменил неявный ExecutionContext (EC), предоставляемый akka, своим собственным, следуя тому же правилу: я использую (java) ForkJoinPool с parallelism = 1.

Как следствие, этот код застревает на карте (3). Насколько я понимаю, когда вызывается карта (3), ей требуется поток, но ЕС не может его предоставить, потому что его единственный доступный поток используется Future (1).

Я не понимаю, как должен работать ForkJoinPool. Итак, мой вопрос: правильно ли я понял, и:

  1. в противном случае я неправильно использую Java ForkJoinPool. Т.е. есть способ заставить это работать?
  2. если да, то как акка с этим справляется?

Я использую akka 2.3.15, scala 2.11.12 и java 8

Возможно ли, что при изменении контекста выполнения вы внезапно используете тот же контекст выполнения, что и getData? В этом случае это привело бы к тупиковой ситуации.

Jamie 13.09.2018 19:26

Если я использую тот же EC, что и getData (2) в map (3), он действительно работает. Так что нет, я не смешивал EC.

Juh_ 13.09.2018 19:29

Как насчет 1? 1 тоже использует тот же самый ec? вот что могло бы вызвать тупик. То есть 2 или 3 с использованием того же ec, что и 1.

Jamie 13.09.2018 19:51

3 используют тот же EC, что и 1, что вызывает блокировку. Но почему-то с помощью акки заработало. Я думаю, что либо akka делает что-то умное, чтобы заставить его работать, либо я делаю что-то не так (я не знаю, как должен работать ForkJoinPool, но меня не удивит, что он может справиться с этим, если хорошо закодирован) .

Juh_ 13.09.2018 19:58

Не думаю, что акка делает что-то особенное. может это не совсем parallelism-max 1? Я думаю, что определенно ожидается, что если 1 и 3 будут использовать один и тот же ec с одним потоком, они зайдут в тупик. Можете добавить свою конфигурацию акка?

Jamie 13.09.2018 20:09

У меня те же сомнения по поводу акка, но потом я бы спросил: вы уверены? Я заглянул в код и не увидел ничего особенного, но код слишком сложен, чтобы быть уверенным.

Juh_ 13.09.2018 20:31

Однако я почти уверен в конфигурации akka, потому что я написал упрощенную программу для чтения конфигураций, которая делает то же самое, что и akka, и избегает одновременного изменения как кода распространения, так и системы конфигурации. Но мне никогда не удается быть уверенным, как akka читает конфигурацию, особенно как они заполняют значения по умолчанию и не отменяют ли они некоторые из них одновременно.

Juh_ 13.09.2018 20:37
0
7
99
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вместо того, чтобы оборачивать все в будущее, используйте for-понимание результата первого будущего, поскольку все зависит от него.

for {
  data <- getData()
} yield data.map( whatEver )

или

getData().map { data =>
  data.map { whatEver }
}

Я как раз писал правку об этом. Как я уже сказал, сейчас это слишком сложно изменить. Но мне все же хотелось бы узнать, как это работало с аккой

Juh_ 13.09.2018 19:22
Ответ принят как подходящий

Заглянув в код akka, я думаю, что нашел, что он делает. Я не совсем уверен, но почти: akka ActorSystem создает Dispatchers, который создает MessageDispatcherConfigurator, который создает Dispatcher, который создает ExecutorService (я передаю иерархию классов этого). Есть несколько возможных реализаций, но я думаю, что это наиболее распространенная, и именно это происходит при использовании ForkJoinPool.

Теперь диспетчер расширяет BatchingExecutor, который может объединять внутреннюю задачу, такую ​​как карта в вопросе (которая требует запуска потока), до текущего потока.

Опять же, код слишком сложен, чтобы быть уверенным, и я не буду углубляться в подробности. Но на самом деле akka EC может переносить вызов внутренней карты в родительский поток, чего не происходит со стандартным (т.е. java) ForkJoinPool.

Думаю, это хитрый трюк от акка, а не типовая реализация. В документе BatchingExecutor говорится:

/**
 * Mixin trait for an Executor
 * which groups multiple nested `Runnable.run()` calls
 * into a single Runnable passed to the original
 * Executor. This can be a useful optimization
 * because it bypasses the original context's task
 * queue and keeps related (nested) code on a single
 * thread which may improve CPU affinity. However,
 * if tasks passed to the Executor are blocking
 * or expensive, this optimization can prevent work-stealing
 * and make performance worse. Also, some ExecutionContext
 * may be fast enough natively that this optimization just
 * adds overhead.
 * The default ExecutionContext.global is already batching
 * or fast enough not to benefit from it; while
 * `fromExecutor` and `fromExecutorService` do NOT add
 * this optimization since they don't know whether the underlying
 * executor will benefit from it.
 * A batching executor can create deadlocks if code does
 * not use `scala.concurrent.blocking` when it should,
 * because tasks created within other tasks will block
 * on the outer task completing.
 * This executor may run tasks in any order, including LIFO order.
 * There are no ordering guarantees.
 *
 * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
 * in the calling thread synchronously. It must enqueue/handoff the Runnable.
 */

Другие вопросы по теме