Синхронизировать буферизованный канал и группу ожидания

У меня возникла проблема при использовании waitgroup с каналом buffered. Проблема в том, что waitgroup закрывается до того, как канал будет прочитан полностью, из-за чего мой канал прочитан наполовину и прерывается между ними.

func main() {
    var wg sync.WaitGroup
    var err error

    start := time.Now()
    students := make([]studentDetails, 0)
    studentCh := make(chan studentDetail, 10000)
    errorCh := make(chan error, 1)

    wg.Add(1)

    go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
    go func(ch chan studentDetail, e chan error) {
    
    LOOP:
        for {
            select {
            case p, ok := <-ch:
                if ok {
                    L.Printf("Links %s: [%s]\n", p.title, p.link)
                    students = append(students, p)
                } else {
                    L.Print("Closed channel")
                    break LOOP
                }
            case err = <-e:
                if err != nil {
                    break
                }
            }
        }
    }(studentCh, errorCh)
    wg.Wait()
    close(studentCh)
    close(errorCh)
    L.Warnln("closed: all wait-groups completed!")
    L.Warnf("total items fetched: %d", len(students))

    elapsed := time.Since(start)
    L.Warnf("operation took %s", elapsed)
}

Проблема в том, что эта функция recursive. Я имею в виду несколько http call to fetch students, а затем сделать больше звонков в зависимости от состояния.

func (s Student) getDetailStudents(rCtx context.Context, content chan<- studentDetail, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
    util.MustNotNil(rCtx)
    L := logger.GetLogger(rCtx)
    defer func() {
        L.Println("Closing all waitgroup!")
        wg.Done()
    }()

    wc := getWC()
    httpClient := wc.Registry.MustHTTPClient()
    res, err := httpClient.Get(url)
    if err != nil {
        L.Fatal(err)
    }
    defer res.Body.Close()
    if res.StatusCode != 200 {
        L.Errorf("status code error: %d %s", res.StatusCode, res.Status)
        errorCh <- errors.New("service_status_code")
        return
    }

    // parse response and return error if found some through errorCh as done above.
    // decide page subSection based on response if it is more.
    if !subSection {
        wg.Add(1)
        go s.getDetailStudents(rCtx, content, errorCh, wg, link, true)
        // L.Warnf("total pages found %d", pageSub.Length()+1)
    }

    // Find students from response list and parse each Student
    students := s.parseStudentItemList(rCtx, item)
    for _, student := range students {
        content <- student
    }
 
    L.Warnf("Calling HTTP Service for %q with total %d record", url, elementsSub.Length())
}

Переменные изменены, чтобы избежать оригинальной кодовой базы.

Проблема в том, что студенты считываются случайным образом, как только группа ожидания завершает работу. Я ожидаю, что выполнение будет выполнено до тех пор, пока все студенты не будут прочитаны. В случае ошибки оно должно прерваться, как только возникнет ошибка.

Я не уверен, что здесь происходит, но мне легче поддерживать группу ожидания, когда wg.Add(1) и соответствующий defer wg.Done() появляются очень близко друг к другу в коде.

Lars Christian Jensen 20.01.2023 10:23

Это хорошая практика, чтобы использовать его рядом с defer wg.Done(), но я все еще не уверен, что это так. Но я думаю, что если код имеет задержку при записи в канал, то он должен ждать, пока все значения каналов не будут прочитаны.

Sankalp 20.01.2023 10:36
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
1
2
129
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Ответ принят как подходящий

Вам нужно знать, когда завершается приемная горутина. WaitGroup делает это для генерирующей горутины. Таким образом, вы можете использовать две группы ожидания:

wg.Add(1)
go s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
wgReader.Add(1)
go func(ch chan studentDetail, e chan error) {
    defer wgReader.Done()
    ...
}
wg.Wait()
close(studentCh)
close(errorCh)
wgReader.Wait() // Wait for the readers to complete

Это не сработает. Проблема не в том, чтобы разделить 2 goroutines на разные waitgroup. Поскольку goroutines является рекурсивным, поэтому не нужно проверять, когда он будет закрыт. Проблема в том, что waitgroup закрывается перед записью на канал. Я мог бы ограничить проблему, уменьшив размер буфера канала, но этого не ожидается,

Sankalp 12.02.2023 10:33

Первая группа ожидания ждет, пока не закончатся писатели. Второй необходимо дождаться, пока читатель не сделает.

Burak Serdar 12.02.2023 17:00

Спасибо понял проблему. Я не мог контролировать close с помощью одного waitgroup. У меня есть 2 отдельных goroutines, и теперь я могу контролировать выполнение каждого фрагмента кода.

Sankalp 14.02.2023 19:23

Поскольку вы используете буферизованные каналы, вы можете получить оставшиеся значения после закрытия канала. Вам также понадобится механизм, предотвращающий слишком раннее завершение вашей основной функции, пока читатель все еще выполняет работу, как посоветовал @Burak Serdar.

Я реструктурировал код, чтобы дать рабочий пример, но он должен донести суть.

package main

import (
    "context"
    "log"
    "sync"
    "time"
)

type studentDetails struct {
    title string
    link  string
}

func main() {
    var wg sync.WaitGroup
    var err error
    students := make([]studentDetails, 0)
    studentCh := make(chan studentDetails, 10000)
    errorCh := make(chan error, 1)

    start := time.Now()
    wg.Add(1)
    go getDetailStudents(context.TODO(), studentCh, errorCh, &wg, "http://example.com", false)

LOOP:
    for {
        select {
        case p, ok := <-studentCh:
            if ok {
                log.Printf("Links %s: [%s]\n", p.title, p.link)
                students = append(students, p)
            } else {
                log.Println("Draining student channel")
                for p := range studentCh {
                    log.Printf("Links %s: [%s]\n", p.title, p.link)
                    students = append(students, p)
                }
                break LOOP
            }
        case err = <-errorCh:
            if err != nil {

                break LOOP
            }
        case <-wrapWait(&wg):
            close(studentCh)
        }
    }
    close(errorCh)
    elapsed := time.Since(start)
    log.Printf("operation took %s", elapsed)
}

func getDetailStudents(rCtx context.Context, content chan<- studentDetails, errorCh chan<- error, wg *sync.WaitGroup, url string, subSection bool) {
    defer func() {
        log.Println("Closing")
        wg.Done()
    }()
    if !subSection {
        wg.Add(1)
        go getDetailStudents(rCtx, content, errorCh, wg, url, true)
        // L.Warnf("total pages found %d", pageSub.Length()+1)
    }
    content <- studentDetails{
        title: "title",
        link:  "link",
    }
}

// helper function to allow using WaitGroup in a select
func wrapWait(wg *sync.WaitGroup) <-chan struct{} {
    out := make(chan struct{})
    go func() {
        wg.Wait()
        out <- struct{}{}
    }()
    return out
}

Спасибо, я использую тот же фрагмент для управления своим for утверждением.

Sankalp 14.02.2023 19:24
wg.Add(1)
go func(){
    defer wg.Done()
    // I do not think that you need a recursive function.
    // this function overcomplicated.
    s.getDetailStudents(rCtx, studentCh , errorCh, &wg, s.Link, false)
}(...)

wg.Add(1)
go func(ch chan studentDetail, e chan error) {
    defer wg.Done()
    ...
}(...)

wg.Wait()
close(studentCh)
close(errorCh)

Это должно решить проблему. Функция s.getDetailStudents должна быть упрощена. Делать его рекурсивным не имеет никакой пользы.

Другие вопросы по теме