Паника: отправка по закрытому каналу при выполнении подпрограммы go в цикле foor

Я пытаюсь сделать параллельную версию grep. Программа просматривает каталоги/подкаталоги и возвращает все совпадающие строки с предоставленным шаблоном.

Я пытаюсь запустить поиск файлов одновременно, когда у меня есть все файлы для поиска (см. функцию searchPaths). Первоначально я получал:

fatal error: all goroutines are asleep - deadlock

Пока я не добавил close(out) в конце searchPaths, к которому он теперь возвращается:

Panic: Send on a closed channel when running go routine in foor loop

Я пытаюсь реализовать что-то похожее на:

https://go.dev/blog/pipelines#fan-out-fan-in

Дело в том, что я закрываю канал не в том месте?

package main

import (
    "fmt"
    "io/fs"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"
    "strings"
    "sync"
)

type SearchResult struct {
    line       string
    lineNumber int
}

type Display struct {
    filePath string
    SearchResult
}

var wg sync.WaitGroup

func (d Display) PrettyPrint() {
    fmt.Printf("Line Number: %v\nFilePath: %v\nLine: %v\n\n", d.lineNumber, d.filePath, d.line)
}

func searchLine(pattern string, line string, lineNumber int) (SearchResult, bool) {
    if strings.Contains(line, pattern) {
        return SearchResult{lineNumber: lineNumber + 1, line: line}, true
    }
    return SearchResult{}, false
}

func splitIntoLines(file string) []string {
    lines := strings.Split(file, "\n")
    return lines
}

func fileFromPath(path string) string {
    fileContent, err := ioutil.ReadFile(path)

    if err != nil {
        log.Fatal(err)
    }

    return string(fileContent)
}

func getRecursiveFilePaths(inputDir string) []string {
    var paths []string
    err := filepath.Walk(inputDir, func(path string, info fs.FileInfo, err error) error {
        if err != nil {
            fmt.Printf("prevent panic by handling failure accessing a path %q: %v\n", path, err)
            return err
        }
        if !info.IsDir() {
            paths = append(paths, path)
        }
        return nil
    })
    if err != nil {
        fmt.Printf("Error walking the path %q: %v\n", inputDir, err)
    }
    return paths
}

func searchPaths(paths []string, pattern string) <-chan Display {
    out := make(chan Display)

    for _, path := range paths {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for _, display := range searchFile(path, pattern) {
                out <- display
            }
        }()
    }
    close(out)
    return out
}

func searchFile(path string, pattern string) []Display {
    var out []Display
    input := fileFromPath(path)
    lines := splitIntoLines(input)
    for index, line := range lines {
        if searchResult, ok := searchLine(pattern, line, index); ok {
            out = append(out, Display{path, searchResult})
        }
    }
    return out
}

func main() {
    pattern := os.Args[1]
    dirPath := os.Args[2]

    paths := getRecursiveFilePaths(dirPath)

    out := searchPaths(paths, pattern)
    wg.Wait()
    for d := range out {
        d.PrettyPrint()
    }

}

Объект-отправитель должен закрыть канал именно для того, чтобы избежать отправки по закрытому каналу (что вызывает панику во время выполнения). Если есть несколько отправителей, они должны быть скоординированы, и канал закрывается только тогда, когда все отправители готовы. Ваш wg.Wait() "неуместен". Смотрите: Закрытие канала неизвестной длины

icza 10.11.2022 12:45
close(out); return out — это немедленный красный флаг: нет смысла возвращать канал, который был только что закрыт и поэтому не может быть использован.
Adrian 10.11.2022 16:00
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
1
2
134
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

2 основные проблемы с этим кодом были

  1. вам нужно закрыть канал только после завершения wg.Wait(). вы можете сделать это в отдельной горутине, как показано ниже
  2. поскольку path var в функции searchPaths переназначается несколько раз как часть логики цикла for, это not a good practice to use that var directly in the goroutines, лучшим подходом будет передача ее в качестве аргумента.
package main

import (
    "fmt"
    "io/fs"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"
    "strings"
    "sync"
)

type SearchResult struct {
    line       string
    lineNumber int
}

type Display struct {
    filePath string
    SearchResult
}

var wg sync.WaitGroup

func (d Display) PrettyPrint() {
    fmt.Printf("Line Number: %v\nFilePath: %v\nLine: %v\n\n", d.lineNumber, d.filePath, d.line)
}

func searchLine(pattern string, line string, lineNumber int) (SearchResult, bool) {
    if strings.Contains(line, pattern) {
        return SearchResult{lineNumber: lineNumber + 1, line: line}, true
    }
    return SearchResult{}, false
}

func splitIntoLines(file string) []string {
    lines := strings.Split(file, "\n")
    return lines
}

func fileFromPath(path string) string {
    fileContent, err := ioutil.ReadFile(path)

    if err != nil {
        log.Fatal(err)
    }

    return string(fileContent)
}

func getRecursiveFilePaths(inputDir string) []string {
    var paths []string
    err := filepath.Walk(inputDir, func(path string, info fs.FileInfo, err error) error {
        if err != nil {
            fmt.Printf("prevent panic by handling failure accessing a path %q: %v\n", path, err)
            return err
        }
        if !info.IsDir() {
            paths = append(paths, path)
        }
        return nil
    })
    if err != nil {
        fmt.Printf("Error walking the path %q: %v\n", inputDir, err)
    }
    return paths
}

func searchPaths(paths []string, pattern string) chan Display {
    out := make(chan Display)
    for _, path := range paths {
        wg.Add(1)
        go func(p string, w *sync.WaitGroup) { // as path var is changing value in the loop, it's better to provide it as a argument in goroutine
            defer w.Done()
            for _, display := range searchFile(p, pattern) {
                out <- display
            }
        }(path, &wg)
    }
    return out
}

func searchFile(path string, pattern string) []Display {
    var out []Display
    input := fileFromPath(path)
    lines := splitIntoLines(input)
    for index, line := range lines {
        if searchResult, ok := searchLine(pattern, line, index); ok {
            out = append(out, Display{path, searchResult})
        }
    }
    return out
}

func main() {
    pattern := os.Args[1]
    dirPath := os.Args[2]

    paths := getRecursiveFilePaths(dirPath)

    out := searchPaths(paths, pattern)

    go func(){
        wg.Wait() // waiting before closing the channel
        close(out)
    }()
    
    count := 0
    for d := range out {
        fmt.Println(count)
        d.PrettyPrint()
        count += 1
    }

}

Спасибо за это объяснение! Не могли бы вы объяснить, почему функции wg.Wait и close должны находиться в своей собственной процедуре go, а не только в основной функции?

Sheen 11.11.2022 11:49
All the channel operations are blocking in nature, поэтому в основном, если данные, отправленные на канал, не получены (в этом случае принимается вызов диапазона), тогда подпрограмма, которая отправляет на этот канал, будет блокироваться на неопределенный срок (и наоборот). А также вызовы диапазона не будут выходить из цикла без закрытия канала, поэтому оба они не могут находиться в одном потоке/программе.
Anurag Kumar 11.11.2022 15:22

Другие вопросы по теме