У меня был код, в котором использовалась сборка ExecutionContext (EC) с akka (ActorSystem). Этот код делает что-то весьма своеобразное: он использует AkkaForkJoinPool с parallelism-max = 1
и выполняет что-то вроде:
implicit ec = // akka EC backed by AkkaForkJoinPool with parallelism=1
Future{ // (1)
// (2) get data from DB which uses a separate ExecutionContext for IO
val data: Future[Data] = getData()
// (3) use the data
data.map{ whatEver }
// etc ...
}
[Edit: Я знаю, что это странно - иметь верхнее будущее (1). Но на самом деле это не мой код, он охватывает несколько функций и использует более сложные операции, такие как несколько упакованных для понимания. Так что я не буду это менять]
Теперь я переместил этот код и заменил неявный ExecutionContext (EC), предоставляемый akka, своим собственным, следуя тому же правилу: я использую (java) ForkJoinPool с parallelism = 1.
Как следствие, этот код застревает на карте (3). Насколько я понимаю, когда вызывается карта (3), ей требуется поток, но ЕС не может его предоставить, потому что его единственный доступный поток используется Future (1).
Я не понимаю, как должен работать ForkJoinPool. Итак, мой вопрос: правильно ли я понял, и:
Я использую akka 2.3.15, scala 2.11.12 и java 8
Если я использую тот же EC, что и getData (2) в map (3), он действительно работает. Так что нет, я не смешивал EC.
Как насчет 1? 1 тоже использует тот же самый ec? вот что могло бы вызвать тупик. То есть 2 или 3 с использованием того же ec, что и 1.
3 используют тот же EC, что и 1, что вызывает блокировку. Но почему-то с помощью акки заработало. Я думаю, что либо akka делает что-то умное, чтобы заставить его работать, либо я делаю что-то не так (я не знаю, как должен работать ForkJoinPool, но меня не удивит, что он может справиться с этим, если хорошо закодирован) .
Не думаю, что акка делает что-то особенное. может это не совсем parallelism-max 1? Я думаю, что определенно ожидается, что если 1 и 3 будут использовать один и тот же ec с одним потоком, они зайдут в тупик. Можете добавить свою конфигурацию акка?
У меня те же сомнения по поводу акка, но потом я бы спросил: вы уверены? Я заглянул в код и не увидел ничего особенного, но код слишком сложен, чтобы быть уверенным.
Однако я почти уверен в конфигурации akka, потому что я написал упрощенную программу для чтения конфигураций, которая делает то же самое, что и akka, и избегает одновременного изменения как кода распространения, так и системы конфигурации. Но мне никогда не удается быть уверенным, как akka читает конфигурацию, особенно как они заполняют значения по умолчанию и не отменяют ли они некоторые из них одновременно.
Вместо того, чтобы оборачивать все в будущее, используйте for-понимание результата первого будущего, поскольку все зависит от него.
for {
data <- getData()
} yield data.map( whatEver )
или
getData().map { data =>
data.map { whatEver }
}
Я как раз писал правку об этом. Как я уже сказал, сейчас это слишком сложно изменить. Но мне все же хотелось бы узнать, как это работало с аккой
Заглянув в код akka, я думаю, что нашел, что он делает. Я не совсем уверен, но почти: akka ActorSystem создает Dispatchers
, который создает MessageDispatcherConfigurator
, который создает Dispatcher
, который создает ExecutorService (я передаю иерархию классов этого). Есть несколько возможных реализаций, но я думаю, что это наиболее распространенная, и именно это происходит при использовании ForkJoinPool.
Теперь диспетчер расширяет BatchingExecutor
, который может объединять внутреннюю задачу, такую как карта в вопросе (которая требует запуска потока), до текущего потока.
Опять же, код слишком сложен, чтобы быть уверенным, и я не буду углубляться в подробности. Но на самом деле akka EC может переносить вызов внутренней карты в родительский поток, чего не происходит со стандартным (т.е. java) ForkJoinPool.
Думаю, это хитрый трюк от акка, а не типовая реализация. В документе BatchingExecutor говорится:
/**
* Mixin trait for an Executor
* which groups multiple nested `Runnable.run()` calls
* into a single Runnable passed to the original
* Executor. This can be a useful optimization
* because it bypasses the original context's task
* queue and keeps related (nested) code on a single
* thread which may improve CPU affinity. However,
* if tasks passed to the Executor are blocking
* or expensive, this optimization can prevent work-stealing
* and make performance worse. Also, some ExecutionContext
* may be fast enough natively that this optimization just
* adds overhead.
* The default ExecutionContext.global is already batching
* or fast enough not to benefit from it; while
* `fromExecutor` and `fromExecutorService` do NOT add
* this optimization since they don't know whether the underlying
* executor will benefit from it.
* A batching executor can create deadlocks if code does
* not use `scala.concurrent.blocking` when it should,
* because tasks created within other tasks will block
* on the outer task completing.
* This executor may run tasks in any order, including LIFO order.
* There are no ordering guarantees.
*
* WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
* in the calling thread synchronously. It must enqueue/handoff the Runnable.
*/
Возможно ли, что при изменении контекста выполнения вы внезапно используете тот же контекст выполнения, что и
getData
? В этом случае это привело бы к тупиковой ситуации.