В то время как отвечая на вопрос я попытался реализовать настройку, в которой основной поток объединяет усилия CommonPool для параллельного выполнения ряда независимых задач (так работает java.util.streams).
Я создаю столько участников, сколько потоков CommonPool, плюс канал для основного потока. Актеры используют каналы рандеву:
val resultChannel = Channel<Double>(UNLIMITED)
val poolComputeChannels = (1..commonPool().parallelism).map {
actor<Task>(CommonPool) {
for (task in channel) {
task.execute().also { resultChannel.send(it) }
}
}
}
val mainComputeChannel = Channel<Task>()
val allComputeChannels = poolComputeChannels + mainComputeChannel
Это позволяет мне распределять нагрузку с помощью выражения select, чтобы найти бездействующего участника для каждой задачи:
select {
allComputeChannels.forEach { chan ->
chan.onSend(task) {}
}
}
Так что отправляю все задачи и закрываю каналы:
launch(CommonPool) {
jobs.forEach { task ->
select {
allComputeChannels.forEach { chan ->
chan.onSend(task) {}
}
}
}
allComputeChannels.forEach { it.close() }
}
Теперь мне нужно написать код для основного потока. Здесь я решил обслуживать как mainComputeChannel, выполняя задачи, переданные в основной поток, так и resultChannel, собирая отдельные результаты в окончательную сумму:
return runBlocking {
var completedCount = 0
var sum = 0.0
while (completedCount < NUM_TASKS) {
select<Unit> {
mainComputeChannel.onReceive { task ->
task.execute().also { resultChannel.send(it) }
}
resultChannel.onReceive { result ->
sum += result
completedCount++
}
}
}
resultChannel.close()
sum
}
Это приводит к ситуации, когда mainComputeChannel может быть закрыт из потока CommonPool, но resultChannel все еще нуждается в обслуживании. Если канал закрыт, onReceive выдаст исключение, и onReceiveOrNull немедленно выберет null. Ни один из вариантов не приемлем. Я не нашел способа избежать регистрации mainComputeChannel, если он закрыт. Если я использую if (!mainComputeChannel.isClosedForReceive), он не будет атомарным с регистрационным вызовом.
Это подводит меня к моему вопросу: что было бы хорошей идиомой для выбора каналов, где некоторые могут быть закрыты другим потоком, а другие все еще работают?





В библиотеке kotlinx.coroutines в настоящее время отсутствует примитив для удобства. Выдающееся предложение - добавить функцию receiveOrClose и пункт onReceiveOrClosed для select, что сделало бы возможным написание такого кода.
Однако вам все равно придется вручную отслеживать тот факт, что ваш mainComputeChannel был закрыт, и перестать выбирать его, когда это было. Итак, используя предложенное предложение onReceiveOrClosed, вы напишете что-то вроде этого:
// outside of loop
var mainComputeChannelClosed = false
// inside loop
select<Unit> {
if (!mainComputeChannelClosed) {
mainComputeChannel.onReceiveOrClosed {
if (it.isClosed) mainComputeChannelClosed = true
else { /* do something with it */ }
}
}
// more clauses
}
Подробнее см. https://github.com/Kotlin/kotlinx.coroutines/issues/330.
В таблице нет предложений по дальнейшему упрощению такой схемы.
Худший аспект этой идиомы заключается в том, что вам придется повторять ее для каждой отдельной регистрации в пункте select.
Разница между onReceiveOrNull и onReceiveOrClosed будет для каналов, которые закрыты за исключением. onReceiveOrNull вызовет исключение, а onReceiveOrClose вернет соответствующую причину закрытия. Это важно для написания универсальных операторов комбинирования (список zipLatest и т. д.), Которые должны правильно распространять ошибки из одного из их источников. По непосредственность нет различий в терминах. Все предложения одинаково реагируют на изменения в состоянии канала, независимо от того, действовало ли состояние уже на момент их вызова или позже.
Спасибо, основываясь на том, что я мог бы использовать здесь идиому для реализации того, что я хотел, единственный оставшийся вопрос - практична ли эта идиома.
Я потратил некоторое время на размышления об этом, похоже, что
onReceiveOrClosed- это то, что я считал контрактом сonReceiveOrNull. Таким образом,onReceiveOrNullбудет выбирать только с помощьюnullнемедленно, если канал уже закрыт, а в противном случае он вызовет исключение, какonReceive? Оказывается, я уже рассмотрел идиому, которую вы здесь представляете (но по ошибке использовалonReceiveOrNull), и отказался от нее как слишком сложной для веса, который она тянет. Вместо этого я решил просто оставить каналы открытыми и использовать индивидуальную сигнализацию о завершении.