Что не так с этой реализацией Барьера с использованием атомарности в Rust?

Я написал следующую реализацию барьера, используя только атомы:

use std::sync::atomic::{AtomicUsize, Ordering};

pub struct Barrier {
  pub done: AtomicUsize,
  pub tids: usize,
}

impl Barrier {
  pub fn new(tids: usize) -> Barrier {
    Barrier {
      done: AtomicUsize::new(0),
      tids,
    }
  }

  pub fn wait(&self) {
    let done = self.done.fetch_add(1, Ordering::SeqCst);
    if done + 1 == self.tids {
      self.done.store(0, Ordering::SeqCst);
    } else {
      while self.done.load(Ordering::SeqCst) != 0 {}
    }
  }
}

Это не работает, как ожидалось. Например,

// inside threads loop
barrier.wait();
println!("a");
barrier.wait();
println!("b");

Интуитивно это должно работать, так как после вызова .wait() он будет зависать в цикле while, вырываясь из него после того, как все потоки вызвали .wait(), и сбрасывая счетчик для следующего .wait(). Вместо этого, в конце концов, он зависнет. Ниже приведен пример использования:

fn main() {
  println!("Hello, world!");

  let barrier = &Barrier::new(10);

  std::thread::scope(|s| {
    for tid in 0 .. 10 {
      s.spawn(move || {
        loop {
          barrier.wait();
          println!("{} a", tid);
          barrier.wait();
          println!("{} b", tid);
        }
      });
    }
  });
}

@cafce25 добавил пример использования и исправил проблему с вопросом.

MaiaVictor 03.12.2022 20:31

Редактировать: о, теперь я вижу проблему. Он зависнет, потому что поток a может увеличить счетчик для следующего .wait() до того, как другой поток выйдет из цикла. Глупый. Хотя оставлю вопрос открытым.

MaiaVictor 03.12.2022 20:33

@cafce25 Сделаю, как только найду решение

MaiaVictor 03.12.2022 20:43

Да, именно в этом проблема. Одна итерация может смешиваться со следующей, один быстрый поток может блокировать другой поток из предыдущей итерации. Не гарантируется, что все потоки будут освобождены должным образом.

Finomnis 03.12.2022 21:42

Если вы всегда используете одни и те же потоки, вы можете решить эту проблему, установив второй счетчик «выполнено» и переключаясь между ними между итерациями.

Finomnis 03.12.2022 21:45
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
5
269
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Проблема в том, что существует состояние гонки между двумя последовательными барьерами:

  • Поток может стать незапланированным во время ожидания барьера.
  • Второй поток (который является последним потоком, которого ожидает барьер) входит в барьер, освобождает его, запускает следующую итерацию и снова входит в барьер.
  • Первый поток просыпается и видит значение 1, полностью пропуская снятие барьера.

Если вы уверены, что всегда используете одни и те же потоки, вы можете исправить это, используя два счетчика и переключаясь между ними. Таким образом, все потоки ждут либо первого, либо второго. Но у одного потока нет возможности обойти другие, так как ему придется пройти через второй счетчик, чтобы снова заблокировать первый, а второй разблокируется только в том случае, если в первом не осталось ни одного потока.

Этот, кажется, работает:

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

pub struct Barrier {
    pub done: [AtomicUsize; 2],
    pub use_first_done: AtomicBool,
    pub tids: usize,
}

impl Barrier {
    pub fn new(tids: usize) -> Barrier {
        Barrier {
            done: [AtomicUsize::new(0), AtomicUsize::new(0)],
            use_first_done: AtomicBool::new(true),
            tids,
        }
    }

    pub fn wait(&self) {
        let done = if self.use_first_done.load(Ordering::SeqCst) {
            &self.done[0]
        } else {
            &self.done[1]
        };

        let num_done = done.fetch_add(1, Ordering::SeqCst) + 1;
        if num_done == self.tids {
            self.use_first_done.fetch_xor(true, Ordering::SeqCst);
            done.store(0, Ordering::SeqCst);
        } else {
            while done.load(Ordering::SeqCst) != 0 {}
        }
    }
}

fn main() {
    println!("Hello, world!");

    let barrier = &Barrier::new(10);

    std::thread::scope(|s| {
        for tid in 0..10 {
            s.spawn(move || loop {
                barrier.wait();
                println!("{} a", tid);
                barrier.wait();
                println!("{} b", tid);
            });
        }
    });
}

Альтернативой может быть использование счетчика итераций.

По той же причине, по которой работает переключение между двумя счетчиками done, счетчика итераций из двух итераций (= логическое значение) должно быть достаточно.

Это работает и для меня:

use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

pub struct Barrier {
    pub done: AtomicUsize,
    pub iteration: AtomicBool,
    pub tids: usize,
}

impl Barrier {
    pub fn new(tids: usize) -> Barrier {
        Barrier {
            done: AtomicUsize::new(0),
            iteration: AtomicBool::new(false),
            tids,
        }
    }

    pub fn wait(&self) {
        let iteration = self.iteration.load(Ordering::SeqCst);
        let num_done = self.done.fetch_add(1, Ordering::SeqCst) + 1;
        if num_done == self.tids {
            self.done.store(0, Ordering::SeqCst);
            self.iteration.fetch_xor(true, Ordering::SeqCst);
        } else {
            while iteration == self.iteration.load(Ordering::SeqCst) {}
        }
    }
}

fn main() {
    println!("Hello, world!");

    let barrier = &Barrier::new(10);

    std::thread::scope(|s| {
        for tid in 0..10 {
            s.spawn(move || loop {
                barrier.wait();
                println!("{} a", tid);
                barrier.wait();
                println!("{} b", tid);
            });
        }
    });
}

ВАЖНО: Это работает, только если потоки всегда идентичны. Если этот барьер используют разные потоки, то необходимо иметь больший счетчик итераций.

Выглядит действительно хорошо, спасибо. В итоге я разработал эту версию на основе этого вопроса. Он использует дополнительную переменную для подсчета общего количества пройденных ожиданий, предотвращая помехи. Я считаю, что ваш ответ более надежный.

MaiaVictor 03.12.2022 22:02

@MaiaVictor По вашему мнению, pass может переполниться, вызвав панику. Таким образом, добавление обертки исправит это. Кроме того, я думаю, что это нормально. Я добавил второе решение, которое использует ваш смысл, но использует только счетчик итераций bool.

Finomnis 03.12.2022 22:09

Мне нравится решение @Finomnis. Я тоже хочу его немного улучшить. Пожалуйста, рассмотрите и мои изменения.

pub struct Barrier {
    pub done: AtomicUsize,
    pub iteration: AtomicBool,
    pub tids: usize,
}

impl Barrier {
    pub fn new(tids: usize) -> Barrier {
        Barrier {
            done: AtomicUsize::new(0),
            iteration: AtomicBool::new(false),
            tids,
        }
    }

    pub fn wait(&self) {
┌-----┬ let iteration = self.iteration.load(Ordering::Relaxed);
|     | let num_done = self.done.fetch_add(1, Ordering::AcqRel) + 1;
|     | if num_done == self.tids {
|     └->   self.done.store(0, Ordering::Relaxed);
|           self.iteration.fetch_xor(true, Ordering::AcqRel);
X           return;
|        }
|
└-----> // let iteration = self.iteration.load(Ordering::Relaxed);
        // Instruction couldn't be moved here because we have a request to 
        // see all memory changes self.iteration.fetch_xor with 
        // AcqRel ordering. It's safe to use here Relaxed ordering.

        // For example, we could imagine that Acquire-Release ordering is 
        // like a sandwich.
   ┌----- ...instructions, could be moved inside of a sandwich
   |      Acquire
   ├----->  
   |      Release
   └-X--> ...instructions, but not here.


        // Cares only for changes in self.iteration
        // Let's use here the lowest memory ordering
        while iteration == self.iteration.load(Ordering::Relaxed) {
            // I think that there could be saved a few CPU cycles if we give 
            // a compiler a hint that it's a waiting loop.
            // Note: It's a Clippy suggestion.
            std::hint::spin_loop();

            // Also, could be used yield, but is better to use a hint
            // std::thread::yield_now();
        }
    }
}

Прохладный! У меня все еще есть проблемы с пониманием различных подсказок упорядочения памяти. Я рад, что они уже потеряли два по сравнению с C++, но меня это все еще сбивает с толку: D. Не знала о подсказке spin_loop! Вот это круто, спасибо. Однако я предпочитаю if-else вместо if-return. Но это, наверное, личные предпочтения.

Finomnis 04.12.2022 22:22

Очень круто! Не видел этого ответа раньше. Для подсказки ожидания я использую Backoff crossbeam: docs.rs/crossbeam/latest/crossbeam/utils/struct.Backoff.html

MaiaVictor 06.12.2022 18:29

Другие вопросы по теме