Обновление фрагмента горутины

У меня возникли проблемы с попыткой обновить фрагмент параллельных горутин. Изначально я придумал этот код (я знал, что это неправильный способ сделать это!):

func main() {

    var dwpList = []int {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

    var mmrpList []int
    
    var wg sync.WaitGroup

    ch1 := make(chan int)

    cores := 4

    wg.Add(cores + 1)

    for i := 0; i < cores; i++ {
        go func1(&mmrpList, ch1, &wg)
    }

    iterateSamples(dwpList, ch1, &wg)

    close(ch1)

    wg.Wait()

    fmt.Println(mmrpList)

}

func iterateSamples(dwpList []int, ch1 chan<- int, wg *sync.WaitGroup) {

    defer wg.Done()

    for _, dwp := range dwpList {
    
        ch1 <- dwp
    }
}

func func1(mmrpList *[]int, ch1 <-chan int, wg *sync.WaitGroup) {

    defer wg.Done()

    for dwp := range ch1 {
    
        *mmrpList = append(*mmrpList, dwp)
    }
}

Очевидно, что это не работает, поскольку к срезу одновременно добавляются несколько горутин.

Проблема в том, что всякий раз, когда я пытаюсь переместить добавление в его собственную функцию, управляемую дополнительными каналами, группами ожидания и т. д., я всегда сталкиваюсь с проблемами компиляции или «тупиками» (которые не всегда являются тупиками, а проблемой wg Wait или Done), или он снова становится однопоточным.

Не могли бы вы подсказать мне, как этого следует достичь?


Я попытался применить стиль Бурака к своему коду и получил следующее:

func main() {

    var dwpList = []int {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    var mmrpList []int
    
    var wg sync.WaitGroup

    ch1 := make(chan int)
    ch2 := make(chan int)

    cores := 4

    wg.Add(1)
    go func() {
        defer wg.Done()
        // Get data
        for _, dwp := range dwpList {
            ch1 <- dwp
        }
        close(ch1)
    }()

    for i := 0; i < cores; i++ {

        wg.Add(1)
        go func() {
            defer wg.Done()
            // Do stuff then pass data on
            for dwp := range ch1 {
                ch2 <- dwp
            }
        }()
    }

    go func() {
        wg.Wait()
        close(ch2)
    }()

    for dwp := range ch2 {
        mmrpList = append(mmrpList, dwp)
    }

    fmt.Println(mmrpList)

}

Окончательно! (Однако пришлось сделать запись в ch1 одним процессом)

Закройте ch1, когда вернетесь из первой горутины. Вторая горутина зависает в ожидании чтения из нее.

Burak Serdar 24.07.2024 21:43

@Бурак Извините, нет, все еще не понял. Я еще раз упростил пример ответа, так что теперь единственные горутины пишут 1 -> 2. Закрытие ch1 после записи в него. Все еще тупик.

shakeshuck 24.07.2024 23:39

Нет, вы слишком упростили ситуацию. Ваше предыдущее решение было в порядке, единственное, что вам нужно было сделать, это закрыть канал после того, как все написали. Теперь он блокируется, потому что вы пишете в канал, а горутины не читают из него.

Burak Serdar 24.07.2024 23:55

@Burak В конце концов добрался до цели, но я не знаю, как это можно было бы сделать, если бы я продолжал выполнять несколько операций записи горутины в ch1?

shakeshuck 25.07.2024 02:35

Попробуйте решить ее и задайте отдельный вопрос, если не можете это сделать.

Burak Serdar 25.07.2024 03:39
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
1
5
91
3
Перейти к ответу Данный вопрос помечен как решенный

Ответы 3

Срез распределяется между несколькими горутинами. При обновлении среза необходимо обеспечить взаимное исключение. Другими словами, это должно работать:

func func1(mmrpList *[]int, mmrpLock *sync.Mutex, ch1 <-chan int, wg *sync.WaitGroup) {

    defer wg.Done()

    for dwp := range ch1 {
        mmrpLock.Lock()
        *mmrpList = append(*mmrpList, dwp)
        mmrpLock.Unlock()
    }
}

Это также означает, что вам нужно создать один мьютекс и передать его всем горутинам.

Тем не менее, обычно это не то, как вы хотите это сделать. Просто нет смысла добавлять несколько горутин к одному фрагменту. Это не то, что вы хотите выполнять одновременно из-за последовательного характера добавления к срезу.

Что вы можете сделать, так это иметь одну горутину, которая добавляется к срезу, но несколько горутин, записывающих в канал, что-то вроде:

// Start multiple goroutines to write to ch1
wg:=sync.WaitGroup{}
for i:=0;i<n;i++ {
   wg.Add(1)
   go func() {
      defer wg.Done()
      generateData(ch1)
   }()
}

// Wait for all writers to complete and close the channel
go func() {
   wg.Wait()
   close(ch1)
}()

for data:=range ch1 {
   mmrpList=append(mmrpList,data)
}

Таким образом, вам не потребуется сериализовать доступ к срезу.

Ответ принят как подходящий

Возможно, вы ищете трубопроводы ( Go Playground):

package main

import (
    "fmt"
)

func main() {
    dwpList := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

    ch1 := iterateSamples(dwpList)

    mmrpList := func1(ch1)

    fmt.Println(mmrpList)
}

func iterateSamples(dwpList []int) <-chan int {
    ch1 := make(chan int)

    go func() {
        for _, dwp := range dwpList {
            ch1 <- dwp
        }
        close(ch1)
    }()

    return ch1
}

func func1(ch1 <-chan int) []int {
    var mmrpList []int

    for dwp := range ch1 {
        mmrpList = append(mmrpList, dwp)
    }

    return mmrpList
}

Это читается довольно четко и не блокируется.

Если вы хотите собрать входные данные из нескольких источников (или распределить работу между несколькими работниками), см. главу «Разветвление, разветвление».

Хотя блокировки сами по себе неплохи, наличие нескольких горутин, конкурирующих за один и тот же ресурс (срез), защищенный блокировкой, может привести к худшей производительности, чем выделение одной горутины для выполнения задания.

См. также «Переосмысление классических шаблонов параллелизма».


Если вы хотите перебрать несколько списков, просто используйте merge из «Fan-out, fan-in » ( Go Playground):

package main

import (
    "fmt"
    "sync"
)

func main() {
    dwpList1 := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
    dwpList2 := []int{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}

    ch1 := iterateSamples(dwpList1)
    ch2 := iterateSamples(dwpList2)

    ch := merge(ch1, ch2)

    mmrpList := func1(ch)

    fmt.Println(mmrpList)
}

Эти шаблоны легко компонуются.

Похоже, что часть моей проблемы заключалась (есть!) в том, где найти закрытие. В вашем примере (если я правильно понял) попытка заставить iterateSamples выполняться параллельно приводит к тому, что одна горутина закрывает канал до того, как другие завершат работу. Это была моя дилемма. Установка его после ожидания привела к тупику. С моей точки зрения, это была «Уловка-22». Спасибо за ссылки - я их не нашел.

shakeshuck 25.07.2024 10:19

Если вы хотите сделать несколько iterateSamples, запустите несколько горутин с отдельным каналом для каждой и используйте слияние из «fan-in».

eik 25.07.2024 11:12

вы можете реализовать срез с помощью пакета sync.mutex, чтобы избежать состояния гонки и одновременного доступа к общим ресурсам в нескольких горутинах.


import (
    "fmt"
    "sync"
)

type myList struct {
    list []int
    lock *sync.Mutex
}

func (l *myList) Add(number int) {
    l.lock.Lock()
    defer l.lock.Unlock()
    l.list = append(l.list, number)
}

func (l myList) GetList() []int {
    l.lock.Lock()
    defer l.lock.Unlock()
    return l.list
}

func main() {
    dwpList := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}
    mmrpList := myList{
        list: []int{},
        lock: new(sync.Mutex),
    }

    var listchan = make(chan int)
    defer close(listchan)

    for i := 0; i <= 5; i++ {
        go func() {
            for num := range listchan {
                mmrpList.Add(num)
            }
        }()
    }

    for _, num := range dwpList {
        listchan <- num
    }

    fmt.Println(mmrpList.GetList())
}

в этом коде вы можете добавить элемент в срез в 5 одновременных горутинах и получить список в конце после завершения добавления элементов.

Другие вопросы по теме