У меня возникла проблема при использовании waitgroup
с каналом buffered
. Проблема в том, что waitgroup
закрывается до того, как канал будет прочитан полностью, из-за чего мой канал прочитан наполовину и прерывается между ними.
func main() {
var wg sync.WaitGroup
var err error
start := time.Now()
students := make([]studentDetails, 0)
studentCh := make(chan studentDetail, 10000)
errorCh := make(chan error, 1)
wg.Add(1)
go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
go func(ch chan studentDetail, e chan error) {
LOOP:
for {
select {
case p, ok := <-ch:
if ok {
L.Printf("Links %s: [%s]\n", p.title, p.link)
students = append(students, p)
} else {
L.Print("Closed channel")
break LOOP
}
case err = <-e:
if err != nil {
break
}
}
}
}(studentCh, errorCh)
wg.Wait()
close(studentCh)
close(errorCh)
L.Warnln("closed: all wait-groups completed!")
L.Warnf("total items fetched: %d", len(students))
elapsed := time.Since(start)
L.Warnf("operation took %s", elapsed)
}
Проблема в том, что эта функция recursive
. Я имею в виду несколько http call to fetch students
, а затем сделать больше звонков в зависимости от состояния.
func (s Student) getDetailStudents(rCtx context.Context, content chan<- studentDetail, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
util.MustNotNil(rCtx)
L := logger.GetLogger(rCtx)
defer func() {
L.Println("Closing all waitgroup!")
wg.Done()
}()
wc := getWC()
httpClient := wc.Registry.MustHTTPClient()
res, err := httpClient.Get(url)
if err != nil {
L.Fatal(err)
}
defer res.Body.Close()
if res.StatusCode != 200 {
L.Errorf("status code error: %d %s", res.StatusCode, res.Status)
errorCh <- errors.New("service_status_code")
return
}
// parse response and return error if found some through errorCh as done above.
// decide page subSection based on response if it is more.
if !subSection {
wg.Add(1)
go s.getDetailStudents(rCtx, content, errorCh, wg, link, true)
// L.Warnf("total pages found %d", pageSub.Length()+1)
}
// Find students from response list and parse each Student
students := s.parseStudentItemList(rCtx, item)
for _, student := range students {
content <- student
}
L.Warnf("Calling HTTP Service for %q with total %d record", url, elementsSub.Length())
}
Переменные изменены, чтобы избежать оригинальной кодовой базы.
Проблема в том, что студенты считываются случайным образом, как только группа ожидания завершает работу. Я ожидаю, что выполнение будет выполнено до тех пор, пока все студенты не будут прочитаны. В случае ошибки оно должно прерваться, как только возникнет ошибка.
Это хорошая практика, чтобы использовать его рядом с defer wg.Done()
, но я все еще не уверен, что это так. Но я думаю, что если код имеет задержку при записи в канал, то он должен ждать, пока все значения каналов не будут прочитаны.
Вам нужно знать, когда завершается приемная горутина. WaitGroup делает это для генерирующей горутины. Таким образом, вы можете использовать две группы ожидания:
wg.Add(1)
go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
wgReader.Add(1)
go func(ch chan studentDetail, e chan error) {
defer wgReader.Done()
...
}
wg.Wait()
close(studentCh)
close(errorCh)
wgReader.Wait() // Wait for the readers to complete
Это не сработает. Проблема не в том, чтобы разделить 2 goroutines
на разные waitgroup
. Поскольку goroutines
является рекурсивным, поэтому не нужно проверять, когда он будет закрыт. Проблема в том, что waitgroup
закрывается перед записью на канал. Я мог бы ограничить проблему, уменьшив размер буфера канала, но этого не ожидается,
Первая группа ожидания ждет, пока не закончатся писатели. Второй необходимо дождаться, пока читатель не сделает.
Спасибо понял проблему. Я не мог контролировать close
с помощью одного waitgroup
. У меня есть 2 отдельных goroutines
, и теперь я могу контролировать выполнение каждого фрагмента кода.
Поскольку вы используете буферизованные каналы, вы можете получить оставшиеся значения после закрытия канала. Вам также понадобится механизм, предотвращающий слишком раннее завершение вашей основной функции, пока читатель все еще выполняет работу, как посоветовал @Burak Serdar.
Я реструктурировал код, чтобы дать рабочий пример, но он должен донести суть.
package main
import (
"context"
"log"
"sync"
"time"
)
type studentDetails struct {
title string
link string
}
func main() {
var wg sync.WaitGroup
var err error
students := make([]studentDetails, 0)
studentCh := make(chan studentDetails, 10000)
errorCh := make(chan error, 1)
start := time.Now()
wg.Add(1)
go getDetailStudents(context.TODO(), studentCh, errorCh, &wg, "http://example.com", false)
LOOP:
for {
select {
case p, ok := <-studentCh:
if ok {
log.Printf("Links %s: [%s]\n", p.title, p.link)
students = append(students, p)
} else {
log.Println("Draining student channel")
for p := range studentCh {
log.Printf("Links %s: [%s]\n", p.title, p.link)
students = append(students, p)
}
break LOOP
}
case err = <-errorCh:
if err != nil {
break LOOP
}
case <-wrapWait(&wg):
close(studentCh)
}
}
close(errorCh)
elapsed := time.Since(start)
log.Printf("operation took %s", elapsed)
}
func getDetailStudents(rCtx context.Context, content chan<- studentDetails, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
defer func() {
log.Println("Closing")
wg.Done()
}()
if !subSection {
wg.Add(1)
go getDetailStudents(rCtx, content, errorCh, wg, url, true)
// L.Warnf("total pages found %d", pageSub.Length()+1)
}
content <- studentDetails{
title: "title",
link: "link",
}
}
// helper function to allow using WaitGroup in a select
func wrapWait(wg *sync.WaitGroup) <-chan struct{} {
out := make(chan struct{})
go func() {
wg.Wait()
out <- struct{}{}
}()
return out
}
Спасибо, я использую тот же фрагмент для управления своим for
утверждением.
wg.Add(1)
go func(){
defer wg.Done()
// I do not think that you need a recursive function.
// this function overcomplicated.
s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
}(...)
wg.Add(1)
go func(ch chan studentDetail, e chan error) {
defer wg.Done()
...
}(...)
wg.Wait()
close(studentCh)
close(errorCh)
Это должно решить проблему. Функция s.getDetailStudents должна быть упрощена. Делать его рекурсивным не имеет никакой пользы.
Я не уверен, что здесь происходит, но мне легче поддерживать группу ожидания, когда
wg.Add(1)
и соответствующийdefer wg.Done()
появляются очень близко друг к другу в коде.