Я новичок в параллелизме Golang и работаю над тем, чтобы понять этот фрагмент кода, упомянутый ниже.
Я наблюдаю несколько вещей, которые я не могу объяснить, почему это происходит:
при использовании i меньше, чем равно 100000 for i <= 100000 {
в основной функции иногда печатаются разные значения для nResults и countWrites (в последних двух операторах)
fmt.Printf("number of result writes %d\n", nResults) fmt.Printf("Number of job writes %d\n", jobWrites)
при использовании i больше 1000000 дает panic: send on closed channel
Как я могу убедиться, что значения, отправляемые заданиям, не находятся в закрытом канале, и позже, после получения всех значений в результатах, мы можем закрыть канал без взаимоблокировки?
package main
import (
"fmt"
"sync"
)
func worker(wg *sync.WaitGroup, id int, jobs <-chan int, results chan<- int, countWrites *int64) {
defer wg.Done()
for j := range jobs {
*countWrites += 1
go func(j int) {
if j%2 == 0 {
results <- j * 2
} else {
results <- j
}
}(j)
}
}
func main() {
wg := &sync.WaitGroup{}
jobs := make(chan int)
results := make(chan int)
var i int = 1
var jobWrites int64 = 0
for i <= 10000000 {
go func(j int) {
if j%2 == 0 {
i += 99
j += 99
}
jobWrites += 1
jobs <- j
}(i)
i += 1
}
var nResults int64 = 0
for w := 1; w < 1000; w++ {
wg.Add(1)
go worker(wg, w, jobs, results, &nResults)
}
close(jobs)
wg.Wait()
var sum int32 = 0
var count int64 = 0
for r := range results {
count += 1
sum += int32(r)
if count == nResults {
close(results)
}
}
fmt.Println(sum)
fmt.Printf("number of result writes %d\n", nResults)
fmt.Printf("Number of job writes %d\n", jobWrites)
}
Довольно много проблем в вашем коде.
Один общий принцип использования каналов Go заключается в следующем:
не закрывать канал со стороны получателя и не закрывать канал, если в канале есть несколько одновременных отправителей
(https://go101.org/article/channel-closing.html)
Решение для вас простое: не иметь нескольких одновременных отправителей, и тогда вы можете закрыть канал со стороны отправителя.
Вместо того, чтобы запускать миллионы отдельных горутин для каждого задания, которое вы добавляете в канал, запустите одну горутину, которая выполняет весь цикл, чтобы добавить все задания в канал. И закрыть канал после цикла. Рабочие будут потреблять канал так быстро, как только смогут.
Вы изменяете две общие переменные, не предпринимая специальных действий:
nResults
, который вы передаете countWrites *int64
в воркере.i
в цикле, который записывает в канал заданий: вы добавляете к нему 99 из нескольких горутин, что делает непредсказуемым, сколько значений вы на самом деле записываете в канал jobs
Для решения 1 есть много вариантов, в том числе с помощью sync.Mutex
. Однако, поскольку вы просто добавляете к нему, самое простое решение — использовать atomic.AddInt64(countWrites, 1)
вместо *countWrites += 1
Чтобы решить 2, используйте не одну горутину для каждой записи в канал, а одну горутину для всего цикла (см. выше)
Как я понимаю, основная функция - это приемник для канала результатов, а также отправка значений несколькими одновременными работниками... поэтому в основном мы не должны закрывать канал в основном... но тогда, не закрывая канал, мы не можем разрешить взаимоблокировку. , так может быть, нужно вести счет?
Кроме того, теперь я получаю эту паническую ошибку panic: too many concurrent operations on a single file or socket
Вы не должны закрывать канал после того, как все результаты будут использованы: вы должны закрыть его после того, как в него будут записаны все данные. Вы можете читать из закрытого канала, и диапазон по каналу будет завершен, когда канал будет закрыт. Но нельзя писать в закрытый канал.
Вы также не должны писать в канал результатов в горутине в рабочей функции, я сначала этого не видел. Рабочая функция уже запущена в горутине. Просто отправьте прямо на канал results
из worker
, а не из другой горутины. Это, вероятно, вызывает слишком много одновременных операций, а также заставляет вас помечать рабочую группу как «Готово», когда вы не знаете, завершены ли внутренние горутины.
Большое спасибо за подробный ответ. Я понимаю, что вы предлагаете это изменение
go func() { for i := 1; i <= 10000; i++ { if i%2 == 0 { i += 99 } fmt.Printf("Value send to jobs is %d\n", i) jobs <- i } close(jobs) }()
Но я все еще не понимаю, как я смогу закрыть канал результатов после того, как все результаты будут использованы? Должен ли я по-прежнему вести подсчет количества значений, которые были отправлены в результаты, или есть лучший способ?