Как безопасно выбирать каналы, некоторые из которых могут быть одновременно закрыты?

В то время как отвечая на вопрос я попытался реализовать настройку, в которой основной поток объединяет усилия CommonPool для параллельного выполнения ряда независимых задач (так работает java.util.streams).

Я создаю столько участников, сколько потоков CommonPool, плюс канал для основного потока. Актеры используют каналы рандеву:

val resultChannel = Channel<Double>(UNLIMITED)
val poolComputeChannels = (1..commonPool().parallelism).map {
    actor<Task>(CommonPool) {
        for (task in channel) {
            task.execute().also { resultChannel.send(it) }
        }
    }
}
val mainComputeChannel = Channel<Task>()
val allComputeChannels = poolComputeChannels + mainComputeChannel

Это позволяет мне распределять нагрузку с помощью выражения select, чтобы найти бездействующего участника для каждой задачи:

select {
    allComputeChannels.forEach { chan ->
        chan.onSend(task) {}
    }
}

Так что отправляю все задачи и закрываю каналы:

launch(CommonPool) {
    jobs.forEach { task ->
        select {
            allComputeChannels.forEach { chan ->
                chan.onSend(task) {}
            }
        }
    }
    allComputeChannels.forEach { it.close() }
}

Теперь мне нужно написать код для основного потока. Здесь я решил обслуживать как mainComputeChannel, выполняя задачи, переданные в основной поток, так и resultChannel, собирая отдельные результаты в окончательную сумму:

return runBlocking {
    var completedCount = 0
    var sum = 0.0
    while (completedCount < NUM_TASKS) {
        select<Unit> {
            mainComputeChannel.onReceive { task ->
                task.execute().also { resultChannel.send(it) }
            }
            resultChannel.onReceive { result ->
                sum += result
                completedCount++
            }
        }
    }
    resultChannel.close()
    sum
}

Это приводит к ситуации, когда mainComputeChannel может быть закрыт из потока CommonPool, но resultChannel все еще нуждается в обслуживании. Если канал закрыт, onReceive выдаст исключение, и onReceiveOrNull немедленно выберет null. Ни один из вариантов не приемлем. Я не нашел способа избежать регистрации mainComputeChannel, если он закрыт. Если я использую if (!mainComputeChannel.isClosedForReceive), он не будет атомарным с регистрационным вызовом.

Это подводит меня к моему вопросу: что было бы хорошей идиомой для выбора каналов, где некоторые могут быть закрыты другим потоком, а другие все еще работают?

Стоит ли изучать PHP в 2026-2027 годах?
Стоит ли изучать PHP в 2026-2027 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
366
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

В библиотеке kotlinx.coroutines в настоящее время отсутствует примитив для удобства. Выдающееся предложение - добавить функцию receiveOrClose и пункт onReceiveOrClosed для select, что сделало бы возможным написание такого кода.

Однако вам все равно придется вручную отслеживать тот факт, что ваш mainComputeChannel был закрыт, и перестать выбирать его, когда это было. Итак, используя предложенное предложение onReceiveOrClosed, вы напишете что-то вроде этого:

// outside of loop
var mainComputeChannelClosed = false
// inside loop
select<Unit> {
    if (!mainComputeChannelClosed) {
        mainComputeChannel.onReceiveOrClosed { 
            if (it.isClosed) mainComputeChannelClosed = true 
            else { /* do something with it */ }
        }
    }
    // more clauses
}    

Подробнее см. https://github.com/Kotlin/kotlinx.coroutines/issues/330.

В таблице нет предложений по дальнейшему упрощению такой схемы.

Я потратил некоторое время на размышления об этом, похоже, что onReceiveOrClosed - это то, что я считал контрактом с onReceiveOrNull. Таким образом, onReceiveOrNull будет выбирать только с помощью nullнемедленно, если канал уже закрыт, а в противном случае он вызовет исключение, как onReceive? Оказывается, я уже рассмотрел идиому, которую вы здесь представляете (но по ошибке использовал onReceiveOrNull), и отказался от нее как слишком сложной для веса, который она тянет. Вместо этого я решил просто оставить каналы открытыми и использовать индивидуальную сигнализацию о завершении.

Marko Topolnik 25.07.2018 10:40

Худший аспект этой идиомы заключается в том, что вам придется повторять ее для каждой отдельной регистрации в пункте select.

Marko Topolnik 25.07.2018 10:41

Разница между onReceiveOrNull и onReceiveOrClosed будет для каналов, которые закрыты за исключением. onReceiveOrNull вызовет исключение, а onReceiveOrClose вернет соответствующую причину закрытия. Это важно для написания универсальных операторов комбинирования (список zipLatest и т. д.), Которые должны правильно распространять ошибки из одного из их источников. По непосредственность нет различий в терминах. Все предложения одинаково реагируют на изменения в состоянии канала, независимо от того, действовало ли состояние уже на момент их вызова или позже.

Roman Elizarov 26.07.2018 11:14

Спасибо, основываясь на том, что я мог бы использовать здесь идиому для реализации того, что я хотел, единственный оставшийся вопрос - практична ли эта идиома.

Marko Topolnik 26.07.2018 12:03

Другие вопросы по теме