Срез, измененный между горутинами с мьютексом, не показывает правильную синхронизацию

Я новичок, но раньше работал с параллелизмом. У меня проблема с разделением фрагмента между несколькими горутинами, не содержащими одинаковых данных между всеми горутинами. Я также использую мьютекс для блокировки структуры при изменении среза, но, похоже, это не помогает. Я прикрепил свой код и хотел бы знать, что я делаю неправильно, спасибо за любую помощь!

type State struct {
    waiting int32
    processing int32
    completed int32
}

type Scheduler struct {
    sync.Mutex
    items chan interface{}
    backPressure []interface{}
    capacity int
    canceler context.CancelFunc
    state State
}

func NewScheduler(capacity int, handler func(interface {}) (interface{}, error)) Scheduler {
    ctx, cancel := context.WithCancel(context.Background())

    state := State{}

    atomic.StoreInt32(&state.waiting, 0)
    atomic.StoreInt32(&state.processing, 0)
    atomic.StoreInt32(&state.completed, 0)

    scheduler := Scheduler{
        items: make(chan interface{}, capacity),
        backPressure: make([]interface{}, 0),
        capacity: capacity,
        canceler: cancel,
        state: state,
    }

    scheduler.initializeWorkers(ctx, handler)

    return scheduler
}

func (s *Scheduler) initializeWorkers(ctx context.Context, handler func(interface {}) (interface{}, error)) {
    for i := 0; i < 5; i++ {
        go s.newWorker(ctx, handler)
    }
}

func (s *Scheduler) newWorker(ctx context.Context, handler func(interface {}) (interface{}, error)) {
    backoff := 0

    for {
        select {
        case <-ctx.Done():
            return
        case job := <- s.items:
            atomic.AddInt32(&s.state.waiting, -1)
            atomic.AddInt32(&s.state.processing, 1)
            job, _ = handler(job)
            backoff = 0
            atomic.AddInt32(&s.state.processing, -1)
            atomic.AddInt32(&s.state.completed, 1)
        default:
            backoff += 1
            s.CheckBackPressure()
            time.Sleep(time.Duration(backoff * 10) * time.Millisecond)
        }
    }
}

func (s *Scheduler) AddItem(item interface{}) {
    atomic.AddInt32(&s.state.waiting, 1)

    if len(s.items) < s.capacity {
        select {
        case s.items <- item:
            return
        }
    }

    s.Lock()
    defer s.Unlock()

    s.backPressure = append(s.backPressure, item)

    fmt.Printf("new backpressure len %v \n", len(s.backPressure))

    return
}

func (s *Scheduler) Process() {
    var wg sync.WaitGroup

    wg.Add(1)


    go func() {
        defer wg.Done()

        for {
            if atomic.LoadInt32(&s.state.waiting) == 0 && atomic.LoadInt32(&s.state.processing) == 0 {
                return
            }
            runtime.Gosched()
        }
    }()

    wg.Wait()
}

func (s *Scheduler) CheckBackPressure() {
    s.Lock()
    defer s.Unlock()

    if len(s.backPressure) == 0 || s.capacity <= len(s.items) {
        fmt.Printf("backpressure = %d  :: len = %d cap = %d \n", len(s.backPressure), len(s.items), s.capacity)
        return
    }

    fmt.Printf("releasing backpressure \n")

    job, tmp := s.backPressure[0], s.backPressure[1:]

    s.backPressure = tmp

    s.items <- job
    return
}

func (s *Scheduler) Stop() {
    s.canceler()
}

Это код, который я использую для проверки функциональности:

type Job struct {
    Value int
}

func TestSchedulerExceedingCapacity(t *testing.T) {


    handler := func (ptr interface{}) (interface{}, error) {
        job, ok := (ptr).(*Job)

        if ok != true {
            return nil, errors.New("failed to convert job")
        }

        // simulate work
        time.Sleep(50 * time.Millisecond)

        return job, nil
    }

    scheduler := NewScheduler(5, handler)

    for i := 0; i < 25; i++ {
        scheduler.AddItem(&(Job { Value: i }))
    }

    fmt.Printf("PROCESSING\n")
    scheduler.Process()
    fmt.Printf("FINISHED\n")
}

Когда я обновляю срез, который удерживает обратное давление, кажется, что он был правильно обновлен, печатая new backpressure len 1 для 1-16.

Однако, когда я проверяю противодавление у рабочего, это указывает на то, что срез противодавления пуст. backpressure = 0 :: len = 0 cap = 5.

Также «снятие противодавления» никогда не выводится на стандартный вывод.

Вот дополнительный вывод...

=== RUN   TestSchedulerExceedingCapacity
new backpressure len 1 
new backpressure len 2 
new backpressure len 3 
new backpressure len 4 
new backpressure len 5 
new backpressure len 6 
new backpressure len 7 
new backpressure len 8 
backpressure = 0  :: len = 0 cap = 5 
new backpressure len 9 
new backpressure len 10 
new backpressure len 11 
new backpressure len 12 
new backpressure len 13 
new backpressure len 14 
new backpressure len 15 
new backpressure len 16 
PROCESSING
backpressure = 0  :: len = 0 cap = 5 
backpressure = 0  :: len = 0 cap = 5 
backpressure = 0  :: len = 0 cap = 5 
...

Если я не убью тест, он бесконечно печатает backpressure = 0 :: len = 0 cap = 5

Я предполагаю, что я неправильно синхронизирую изменения, я был бы ДЕЙСТВИТЕЛЬНО признателен за любую информацию, спасибо!

Вам никогда не нужно вызывать runtime.Gosched(), если только вы не написали замкнутый цикл без вызовов функций. Планировщик сам по себе работает довольно хорошо в 99,9% случаев.

Adrian 30.05.2019 23:48

Спасибо за информацию - у меня была проблема с атомными целыми числами, которые не обновлялись, когда я их не вызывал. ИЛИ они обновлялись, но неправильно выводились на стандартный вывод. Я добавил сон с отсрочкой опыта или runtime.Gosched(). Однако использование отсрочки опыта не исправляет синхронизацию.

kyle 30.05.2019 23:56
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
Создание API ввода вопросов на разных языках программирования (Python, PHP, Go и Node.js)
API ввода вопросов - это полезный инструмент для интеграции моделей машинного обучения, таких как ChatGPT, в приложения, требующие обработки...
3
2
49
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Хорошо, я смог понять это, конечно, как только я разместил вопрос...

Я где-то видел, что предлагалось запустить тест с опцией -race, которая включает детектор гонки данных. Я сразу же получил ошибки, которые помогли упростить отладку проблемы.

Оказывается, проблема была связана с возвратом значения NewScheduler, а не указателя нового планировщика. Я изменил эту функцию на следующий код, который устранил проблему.

func NewScheduler(capacity int, handler func(interface {}) (interface{}, error)) *Scheduler {
    ctx, cancel := context.WithCancel(context.Background())

    state := State{}

    atomic.StoreInt32(&state.waiting, 0)
    atomic.StoreInt32(&state.processing, 0)
    atomic.StoreInt32(&state.completed, 0)
    atomic.StoreInt32(&state.errors, 0)

    scheduler := Scheduler{
        items: make(chan interface{}, capacity),
        backPressure: make([]interface{}, 0),
        capacity: capacity,
        canceler: cancel,
        state: state,
    }

    scheduler.initializeWorkers(ctx, handler)

    return &scheduler
}

Другие вопросы по теме