Изящно закрыть канал и не отправлять по закрытому каналу

Я новичок в параллелизме Golang и работаю над тем, чтобы понять этот фрагмент кода, упомянутый ниже.

Я наблюдаю несколько вещей, которые я не могу объяснить, почему это происходит:

  1. при использовании i меньше, чем равно 100000 for i <= 100000 { в основной функции иногда печатаются разные значения для nResults и countWrites (в последних двух операторах) fmt.Printf("number of result writes %d\n", nResults) fmt.Printf("Number of job writes %d\n", jobWrites)

  2. при использовании i больше 1000000 дает panic: send on closed channel

Как я могу убедиться, что значения, отправляемые заданиям, не находятся в закрытом канале, и позже, после получения всех значений в результатах, мы можем закрыть канал без взаимоблокировки?

package main

import (
    "fmt"
    "sync"
)

func worker(wg *sync.WaitGroup, id int, jobs <-chan int, results chan<- int, countWrites *int64) {
    defer wg.Done()
    for j := range jobs {
        *countWrites += 1
        go func(j int) {
            if j%2 == 0 {
                results <- j * 2
            } else {
                results <- j
            }
        }(j)
    }
}

func main() {
    wg := &sync.WaitGroup{}
    jobs := make(chan int)
    results := make(chan int)
    var i int = 1
    var jobWrites int64 = 0
    for i <= 10000000 {
        go func(j int) {
            if j%2 == 0 {
                i += 99
                j += 99
            }
            jobWrites += 1
            jobs <- j
        }(i)
        i += 1
    }

    var nResults int64 = 0
    for w := 1; w < 1000; w++ {
        wg.Add(1)
        go worker(wg, w, jobs, results, &nResults)
    }

    close(jobs)
    wg.Wait()

    var sum int32 = 0
    var count int64 = 0
    for r := range results {
        count += 1
        sum += int32(r)
        if count == nResults {
            close(results)
        }
    }
    fmt.Println(sum)
    fmt.Printf("number of result writes %d\n", nResults)
    fmt.Printf("Number of job writes %d\n", jobWrites)
}

Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
1
0
67
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Довольно много проблем в вашем коде.

Отправка по закрытому каналу

Один общий принцип использования каналов Go заключается в следующем:

не закрывать канал со стороны получателя и не закрывать канал, если в канале есть несколько одновременных отправителей

(https://go101.org/article/channel-closing.html)

Решение для вас простое: не иметь нескольких одновременных отправителей, и тогда вы можете закрыть канал со стороны отправителя.

Вместо того, чтобы запускать миллионы отдельных горутин для каждого задания, которое вы добавляете в канал, запустите одну горутину, которая выполняет весь цикл, чтобы добавить все задания в канал. И закрыть канал после цикла. Рабочие будут потреблять канал так быстро, как только смогут.

Гонки данных за счет изменения общих переменных в нескольких горутинах

Вы изменяете две общие переменные, не предпринимая специальных действий:

  1. nResults, который вы передаете countWrites *int64 в воркере.
  2. i в цикле, который записывает в канал заданий: вы добавляете к нему 99 из нескольких горутин, что делает непредсказуемым, сколько значений вы на самом деле записываете в канал jobs

Для решения 1 есть много вариантов, в том числе с помощью sync.Mutex. Однако, поскольку вы просто добавляете к нему, самое простое решение — использовать atomic.AddInt64(countWrites, 1) вместо *countWrites += 1

Чтобы решить 2, используйте не одну горутину для каждой записи в канал, а одну горутину для всего цикла (см. выше)

Большое спасибо за подробный ответ. Я понимаю, что вы предлагаете это изменение go func() { for i := 1; i <= 10000; i++ { if i%2 == 0 { i += 99 } fmt.Printf("Value send to jobs is %d\n", i) jobs <- i } close(jobs) }() Но я все еще не понимаю, как я смогу закрыть канал результатов после того, как все результаты будут использованы? Должен ли я по-прежнему вести подсчет количества значений, которые были отправлены в результаты, или есть лучший способ?

Mayuresh Anand 17.11.2022 05:12

Как я понимаю, основная функция - это приемник для канала результатов, а также отправка значений несколькими одновременными работниками... поэтому в основном мы не должны закрывать канал в основном... но тогда, не закрывая канал, мы не можем разрешить взаимоблокировку. , так может быть, нужно вести счет?

Mayuresh Anand 17.11.2022 05:20

Кроме того, теперь я получаю эту паническую ошибку panic: too many concurrent operations on a single file or socket

Mayuresh Anand 17.11.2022 05:34

Вы не должны закрывать канал после того, как все результаты будут использованы: вы должны закрыть его после того, как в него будут записаны все данные. Вы можете читать из закрытого канала, и диапазон по каналу будет завершен, когда канал будет закрыт. Но нельзя писать в закрытый канал.

Erwin Bolwidt 17.11.2022 05:48

Вы также не должны писать в канал результатов в горутине в рабочей функции, я сначала этого не видел. Рабочая функция уже запущена в горутине. Просто отправьте прямо на канал results из worker, а не из другой горутины. Это, вероятно, вызывает слишком много одновременных операций, а также заставляет вас помечать рабочую группу как «Готово», когда вы не знаете, завершены ли внутренние горутины.

Erwin Bolwidt 17.11.2022 05:51

Другие вопросы по теме