Я написал следующую реализацию барьера, используя только атомы:
use std::sync::atomic::{AtomicUsize, Ordering};
pub struct Barrier {
pub done: AtomicUsize,
pub tids: usize,
}
impl Barrier {
pub fn new(tids: usize) -> Barrier {
Barrier {
done: AtomicUsize::new(0),
tids,
}
}
pub fn wait(&self) {
let done = self.done.fetch_add(1, Ordering::SeqCst);
if done + 1 == self.tids {
self.done.store(0, Ordering::SeqCst);
} else {
while self.done.load(Ordering::SeqCst) != 0 {}
}
}
}
Это не работает, как ожидалось. Например,
// inside threads loop
barrier.wait();
println!("a");
barrier.wait();
println!("b");
Интуитивно это должно работать, так как после вызова .wait()
он будет зависать в цикле while
, вырываясь из него после того, как все потоки вызвали .wait()
, и сбрасывая счетчик для следующего .wait()
. Вместо этого, в конце концов, он зависнет. Ниже приведен пример использования:
fn main() {
println!("Hello, world!");
let barrier = &Barrier::new(10);
std::thread::scope(|s| {
for tid in 0 .. 10 {
s.spawn(move || {
loop {
barrier.wait();
println!("{} a", tid);
barrier.wait();
println!("{} b", tid);
}
});
}
});
}
Редактировать: о, теперь я вижу проблему. Он зависнет, потому что поток a может увеличить счетчик для следующего .wait()
до того, как другой поток выйдет из цикла. Глупый. Хотя оставлю вопрос открытым.
@cafce25 Сделаю, как только найду решение
Да, именно в этом проблема. Одна итерация может смешиваться со следующей, один быстрый поток может блокировать другой поток из предыдущей итерации. Не гарантируется, что все потоки будут освобождены должным образом.
Если вы всегда используете одни и те же потоки, вы можете решить эту проблему, установив второй счетчик «выполнено» и переключаясь между ними между итерациями.
Проблема в том, что существует состояние гонки между двумя последовательными барьерами:
1
, полностью пропуская снятие барьера.Если вы уверены, что всегда используете одни и те же потоки, вы можете исправить это, используя два счетчика и переключаясь между ними. Таким образом, все потоки ждут либо первого, либо второго. Но у одного потока нет возможности обойти другие, так как ему придется пройти через второй счетчик, чтобы снова заблокировать первый, а второй разблокируется только в том случае, если в первом не осталось ни одного потока.
Этот, кажется, работает:
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
pub struct Barrier {
pub done: [AtomicUsize; 2],
pub use_first_done: AtomicBool,
pub tids: usize,
}
impl Barrier {
pub fn new(tids: usize) -> Barrier {
Barrier {
done: [AtomicUsize::new(0), AtomicUsize::new(0)],
use_first_done: AtomicBool::new(true),
tids,
}
}
pub fn wait(&self) {
let done = if self.use_first_done.load(Ordering::SeqCst) {
&self.done[0]
} else {
&self.done[1]
};
let num_done = done.fetch_add(1, Ordering::SeqCst) + 1;
if num_done == self.tids {
self.use_first_done.fetch_xor(true, Ordering::SeqCst);
done.store(0, Ordering::SeqCst);
} else {
while done.load(Ordering::SeqCst) != 0 {}
}
}
}
fn main() {
println!("Hello, world!");
let barrier = &Barrier::new(10);
std::thread::scope(|s| {
for tid in 0..10 {
s.spawn(move || loop {
barrier.wait();
println!("{} a", tid);
barrier.wait();
println!("{} b", tid);
});
}
});
}
Альтернативой может быть использование счетчика итераций.
По той же причине, по которой работает переключение между двумя счетчиками done
, счетчика итераций из двух итераций (= логическое значение) должно быть достаточно.
Это работает и для меня:
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
pub struct Barrier {
pub done: AtomicUsize,
pub iteration: AtomicBool,
pub tids: usize,
}
impl Barrier {
pub fn new(tids: usize) -> Barrier {
Barrier {
done: AtomicUsize::new(0),
iteration: AtomicBool::new(false),
tids,
}
}
pub fn wait(&self) {
let iteration = self.iteration.load(Ordering::SeqCst);
let num_done = self.done.fetch_add(1, Ordering::SeqCst) + 1;
if num_done == self.tids {
self.done.store(0, Ordering::SeqCst);
self.iteration.fetch_xor(true, Ordering::SeqCst);
} else {
while iteration == self.iteration.load(Ordering::SeqCst) {}
}
}
}
fn main() {
println!("Hello, world!");
let barrier = &Barrier::new(10);
std::thread::scope(|s| {
for tid in 0..10 {
s.spawn(move || loop {
barrier.wait();
println!("{} a", tid);
barrier.wait();
println!("{} b", tid);
});
}
});
}
ВАЖНО: Это работает, только если потоки всегда идентичны. Если этот барьер используют разные потоки, то необходимо иметь больший счетчик итераций.
Выглядит действительно хорошо, спасибо. В итоге я разработал эту версию на основе этого вопроса. Он использует дополнительную переменную для подсчета общего количества пройденных ожиданий, предотвращая помехи. Я считаю, что ваш ответ более надежный.
@MaiaVictor По вашему мнению, pass
может переполниться, вызвав панику. Таким образом, добавление обертки исправит это. Кроме того, я думаю, что это нормально. Я добавил второе решение, которое использует ваш смысл, но использует только счетчик итераций bool.
Мне нравится решение @Finomnis. Я тоже хочу его немного улучшить. Пожалуйста, рассмотрите и мои изменения.
pub struct Barrier {
pub done: AtomicUsize,
pub iteration: AtomicBool,
pub tids: usize,
}
impl Barrier {
pub fn new(tids: usize) -> Barrier {
Barrier {
done: AtomicUsize::new(0),
iteration: AtomicBool::new(false),
tids,
}
}
pub fn wait(&self) {
┌-----┬ let iteration = self.iteration.load(Ordering::Relaxed);
| | let num_done = self.done.fetch_add(1, Ordering::AcqRel) + 1;
| | if num_done == self.tids {
| └-> self.done.store(0, Ordering::Relaxed);
| self.iteration.fetch_xor(true, Ordering::AcqRel);
X return;
| }
|
└-----> // let iteration = self.iteration.load(Ordering::Relaxed);
// Instruction couldn't be moved here because we have a request to
// see all memory changes self.iteration.fetch_xor with
// AcqRel ordering. It's safe to use here Relaxed ordering.
// For example, we could imagine that Acquire-Release ordering is
// like a sandwich.
┌----- ...instructions, could be moved inside of a sandwich
| Acquire
├----->
| Release
└-X--> ...instructions, but not here.
// Cares only for changes in self.iteration
// Let's use here the lowest memory ordering
while iteration == self.iteration.load(Ordering::Relaxed) {
// I think that there could be saved a few CPU cycles if we give
// a compiler a hint that it's a waiting loop.
// Note: It's a Clippy suggestion.
std::hint::spin_loop();
// Also, could be used yield, but is better to use a hint
// std::thread::yield_now();
}
}
}
Прохладный! У меня все еще есть проблемы с пониманием различных подсказок упорядочения памяти. Я рад, что они уже потеряли два по сравнению с C++, но меня это все еще сбивает с толку: D. Не знала о подсказке spin_loop
! Вот это круто, спасибо. Однако я предпочитаю if-else вместо if-return. Но это, наверное, личные предпочтения.
Очень круто! Не видел этого ответа раньше. Для подсказки ожидания я использую Backoff crossbeam: docs.rs/crossbeam/latest/crossbeam/utils/struct.Backoff.html
@cafce25 добавил пример использования и исправил проблему с вопросом.