Обработка большого CSV-файла и ограничение горутин

Я пытаюсь найти лучший эффективный способ чтения файла csv (строка ~ 1M). Каждая строка содержит HTTP-ссылку на изображение, которое мне нужно загрузить.

Это мой текущий код с использованием рабочих пулов:

func worker(queue chan []string, worknumber int, done, ks chan bool) {
    for true {
        select {
        case url := <-queue:
            fmt.Println("doing work!", url, "worknumber", worknumber)
            processData(url) // HTTP download
            done <- true
        case <-ks:
            fmt.Println("worker halted, number", worknumber)
            return
        }
    }
}

func main() {
    start := time.Now()
    flag.Parse()
    fmt.Print(strings.Join(flag.Args(), "\n"))
    if *filename == "REQUIRED" {
        return
    }

    csvfile, err := os.Open(*filename)
    if err != nil {
        fmt.Println(err)
        return
    }
    count, _ := lineCounter(csvfile)
    fmt.Printf("Total count: %d\n", count)
    csvfile.Seek(0, 0)

    defer csvfile.Close()

    //bar := pb.StartNew(count)
    bar := progressbar.NewOptions(count)
    bar.RenderBlank()

    reader := csv.NewReader(csvfile)

    //channel for terminating the workers
    killsignal := make(chan bool)

    //queue of jobs
    q := make(chan []string)
    // done channel takes the result of the job
    done := make(chan bool)

    numberOfWorkers := *numChannels
    for i := 0; i < numberOfWorkers; i++ {
        go worker(q, i, done, killsignal)
    }

    i := 0
    for {
        record, err := reader.Read()
        if err == io.EOF {
            break
        } else if err != nil {
            fmt.Println(err)
            return
        }
        i++

        go func(r []string, i int) {
            q <- r
            bar.Add(1)
        }(record, i)
    }

    // a deadlock occurs if c >= numberOfJobs
    for c := 0; c < count; c++ {
        <-done
    }

    fmt.Println("finished")

    // cleaning workers
    close(killsignal)
    time.Sleep(2 * time.Second)

    fmt.Printf("\n%2fs", time.Since(start).Seconds())
}

Моя проблема в том, что он открывает много горутин, использует всю память и вылетает.

Что было бы лучшим способом ограничить это?

Вы думаете о bufio.NewScanner(file), это не будет использовать слишком много памяти.

PumpkinSeed 27.05.2019 13:57

ваш цикл по строкам cvs должен быть в разных процедурах. основная подпрограмма должна нести ответственность за вывод рабочих операций и ждать завершения или ошибки для выхода.

mh-cbon 27.05.2019 14:01
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
2
2
2 495
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Вы создаете новую горутину для каждой строки в файле. Поэтому. Нет причин делать это, если у вас уже есть нужные вам работники.

Короче говоря, измените это:

    go func(r []string, i int) {
        q <- r
        bar.Add(1)
    }(record, i)

к этому:

    q <- record
    bar.Add(1)

Я пытался, но затем, кажется, входит в тупик. Он начинает обрабатывать numberOfWorkers количество строк, а затем зависает.

Ilan 27.05.2019 13:57

происходит потому, что ничто не читает выходные данные ваших рабочих процессов, пока вы полностью не обработаете ввод.

mh-cbon 27.05.2019 13:59

Не могли бы вы помочь мне понять больше о вашем комментарии?

Ilan 27.05.2019 14:24

Ваши работники пишут в канал done, но это не читается до конца программы. Именно это приводит к тупику.

Flimzy 27.05.2019 14:36
Ответ принят как подходящий

Я вырезал индикатор выполнения, так как не хотел об этом беспокоиться, но в целом это ближе к тому, что вы ищете.

Он на самом деле не обрабатывает ошибки, они просто переходят в фатальное состояние.

Я добавил поддержку контекста и отмены.

Возможно, вы захотите проверить https://godoc.org/golang.org/x/sync/errgroup#Group.Go

В качестве общей рекомендации вам необходимо изучить шаблоны golang и их использование.

Очевидно, что вы недостаточно работали или находитесь в процессе обучения.

Это не самая быстрая программа, но она делает свою работу.

Это всего лишь набросок, чтобы вернуть вас в нужное русло.

package main

import (
    "context"
    "encoding/csv"
    "flag"
    "fmt"
    "io"
    "log"
    "os"
    "os/signal"
    "sync"
    "time"
)

func worker(ctx context.Context, dst chan string, src chan []string) {
    for {
        select {
        case url, ok := <-src: // you must check for readable state of the channel.
            if !ok {
                return
            }
            dst <- fmt.Sprintf("out of %v", url) // do somethingg useful.
        case <-ctx.Done(): // if the context is cancelled, quit.
            return
        }
    }
}

func main() {

    // create a context
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()
    // that cancels at ctrl+C
    go onSignal(os.Interrupt, cancel)

    // parse command line arguments
    var filename string
    var numberOfWorkers int
    flag.StringVar(&filename, "filename", "", "src file")
    flag.IntVar(&numberOfWorkers, "c", 2, "concurrent workers")
    flag.Parse()

    // check arguments
    if filename == "" {
        log.Fatal("filename required")
    }

    start := time.Now()

    csvfile, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer csvfile.Close()

    reader := csv.NewReader(csvfile)

    // create the pair of input/output channels for the controller=>workers com.
    src := make(chan []string)
    out := make(chan string)

    // use a waitgroup to manage synchronization
    var wg sync.WaitGroup

    // declare the workers
    for i := 0; i < numberOfWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            worker(ctx, out, src)
        }()
    }

    // read the csv and write it to src
    go func() {
        for {
            record, err := reader.Read()
            if err == io.EOF {
                break
            } else if err != nil {
                log.Fatal(err)
            }
            src <- record // you might select on ctx.Done().
        }
        close(src) // close src to signal workers that no more job are incoming.
    }()

    // wait for worker group to finish and close out
    go func() {
        wg.Wait() // wait for writers to quit.
        close(out) // when you close(out) it breaks the below loop.
    }()

    // drain the output
    for res := range out {
        fmt.Println(res)
    }

    fmt.Printf("\n%2fs", time.Since(start).Seconds())
}

func onSignal(s os.Signal, h func()) {
    c := make(chan os.Signal, 1)
    signal.Notify(c, s)
    <-c
    h()
}

буферизованный канал может помочь вам ограничить горутины

var taskPipe = make(chan interface{}, 5)

func main(){
    go func() {
        taskPipe <- nil
        sleep
    }()
}

func sleep() {
    time.Sleep(time.Second * 5)
    <- taskPipe
}

Другие вопросы по теме