В образовательных целях я реализую классическую задачу «шипящего жужжания», используя несколько потоков.
Игра "Шипучая кайф" - это:
The player designated to go first says the number "1", and each player thenceforth counts one number in turn. However, any number divisible by three is replaced by the word fizz and any divisible by five by the word buzz. Numbers divisible by both become fizz buzz
В моей реализации у меня 4 потока:
Я не использую никаких механизмов блокировки и синхронизации потоков. Является ли моя многопоточная реализация "шипящего шума" потокобезопасной? А если нет, то почему? Я добавил комментарии в код реализации для «подозрительных» мест.
Моя реализация:
package threads;
import java.util.function.IntFunction;
public class FizzBuzzGameRunner {
// not volatile
// if other thread updates currentNum and current thread will see old (cached) value
// nothing bad can happen, just burn some CPU cycles uselessly
private int currentNum = 1;
public static void main(String... args) throws InterruptedException {
FizzBuzzGameRunner fizzBuzzGame = new FizzBuzzGameRunner();
startAll(
fizzBuzzGame.createRunnable(n -> (n % 3 != 0 && n % 5 != 0) ? String.valueOf(n) : null),
fizzBuzzGame.createRunnable(n -> (n % 3 == 0 && n % 5 != 0) ? "fizz" : null),
fizzBuzzGame.createRunnable(n -> (n % 3 != 0 && n % 5 == 0) ? "buzz" : null),
fizzBuzzGame.createRunnable(n -> (n % 3 == 0 && n % 5 == 0) ? "fizz buzz" : null)
);
Thread.sleep(1000);
}
private static void startAll(Runnable... workers) {
for (Runnable w : workers) {
Thread t = new Thread(w);
t.setDaemon(true);
t.start();
}
}
private Runnable createRunnable(IntFunction<String> singleStep) {
return () -> {
while (true) {
int currNum = this.currentNum;
// no synchronization
String result = singleStep.apply(currNum);
if (result != null) {
//Even without synchronization this block will be
//executed maximum by single thread simultaneously.
//Because each thread increments this.currentNum as part of that action,
//but no other thread will increment for the same value.
System.out.println(result);
this.currentNum++;
}
}
};
}
}
Я понимаю, что мой пример полностью искусственный. На реализацию многопоточного алгоритма «Fizz Buzz» вдохновила одна известная книга для подготовки к «собеседованию по кодированию». Я просто хотел доказать, что пример из книги (для которого требовалось иметь 4 потока) можно решить без использования синхронизации и блокировок.
@davmac в моем примере один поток обновляет curNum между A, получающим значение, и B, получающим значение. например. currentNum = 9, A считывает это значение и присваивает curNum значение 9, поток C обновляет currentNum, thead B назначает curNum = 10; теперь A и B оба имеют правильное значение, и это гонка. О, вы говорите, что A не может получить 9 до обновления C, иначе C будет иметь 9.
@matt Я не понимаю. Что вы подразумеваете под «правильным значением»? У каждого потока есть непересекающийся набор значений, с которыми они будут действовать. Каждый поток увеличивает curNum как часть этого действия, но ни один другой поток не увеличивает приращение для того же значения. В вашем примере, если A читает curNum как 9, а затем C обновляет curNum, то это должно быть потому, что C видел curNum как имеющий значение, на которое он будет действовать, а это означает, что ни 9, ни значение, которое видел C, не является значением, которое A будет действовать включен, поэтому не имеет значения, какой именно.
«правильное значение» - это значение, с которым работает конкретный поток. Правильно, C должен увидеть 8 и действовать, а затем обновить значение с 9 до 10, что может произойти, если C увидел старое значение. Поскольку поток C - единственный, который изменил бы 8 на 9, он не может видеть устаревшее значение.
@matt, верно, я думаю, что мы сейчас на одной странице.
«Потокобезопасность» не является атрибутом программ: это атрибут библиотеки / пакета / модуля / и т. д. который информирует вас о том, какие меры предосторожности вы должны предпринять при использовании в программы. Все, что вы можете сказать о программе в целом: она либо работает, либо не работает.
@Solomon Slow Я понимаю потокобезопасность как «Поточно-ориентированный код манипулирует только общими структурами данных таким образом, который гарантирует, что все потоки ведут себя должным образом и выполняют свои проектные спецификации без непреднамеренного взаимодействия» (взято из Википедии)
@Solomon Slow Я понимаю, что мой пример полностью искусственный. На реализацию многопоточного алгоритма «Fizz Buzz» вдохновила одна известная книга для подготовки к «собеседованию по кодированию». Я просто хотел доказать, что пример из книги (для которого требовалось иметь 4 потока) можно решить без использования синхронизации и блокировок.
D'Oh! Комментарий отозван.
Вы изменили вопрос, теперь переменная - volatile. Теперь вопрос задает другой вопрос, чем тот, который был задан и на который был дан ответ! Вы не должны этого делать - пожалуйста, измените его обратно. Если вы хотите спросить, достаточно ли volatile, вам следует открыть другой вопрос и обратиться к этому.
Действительная точка @davmac - изменено обратно




Он не свободен от гонки (что, я думаю, вы действительно спрашиваете), потому что потоки читают currentNum, когда он был написан другим потоком, без какой-либо синхронизации. Не гарантируется, что каждый поток увидит последнее значение - каждый поток увидит либо последнее записанное им значение, либо любое значение, записанное с тех пор любым другим потоком.
Это может означать, что вы в конечном итоге не продвигаетесь вперед ни в одном потоке, поскольку каждый поток может просто не увидеть изменения, внесенные в любой другой поток. Вы можете использовать AtomicInteger для решения этой проблемы.
Я также не уверен, что эффекты this.currentNum++; гарантированно будут видны другим потокам в том же порядке, что и в исходном потоке. Я подозреваю, что теоретически вывод и приращение можно было бы переупорядочить, например:
this.currentNum++;
System.out.println(result);
Это может привести к непоследовательному результату.
Я думаю, что строка "this.currentNum ++" может быть достигнута только одним потоком одновременно, потому что только для одного потока "result = singleStep.apply (currNum)" не может быть одновременно нулевым. Итак, this.currentNum ++; и System.out.printLn никогда не будет выполняться несколькими потоками
@Palladium, почему вы думаете, что "result = singleStep.apply (currNum)" не может быть не нулевым одновременно в нескольких потоках?
@Palladium У вас есть 4 потока, работающих одновременно, нигде нет синхронизации, которая не позволяет этим потокам одновременно обращаться к currNum, или не позволяет потокам обрабатывать один и тот же currNum, или гарантирует, что порядок вывода правильный
@davmac, потому что "singleStep.apply (currNum);" может возвращать не null только тогда, когда currNum «подходит» для текущего потока, а это может происходить только для одного потока одновременно.
@Palladium, это противно. Возможно, вы правы в том, что приращение не может происходить одновременно из разных потоков. Но есть еще существенные проблемы. Я перефразировал ответ.
@davmac Почему можно изменить порядок вывода и приращения? (Я согласен, это будет проблемой)
Я согласен: «Я могу закончить с непрямым прогрессом» (потому что один поток может долгое время не видеть «кешированные» значения), разве не будет то же самое, если я использую volatile int currentNum = 1 вместо «AtomicInteger» currentNum = новый новый AtomicInteger (1) "?
@Palladium q1: потому что, как правило, без синхронизации поток может видеть, что действия другого потока происходят в другом порядке. q2: да, думаю, что volatile было бы вполне достаточно.
Кстати: я думаю, что лучшим решением (производительность на моей машине улучшилась примерно в 4 раза) было бы синхронизировать весь блок, который проверяет и увеличивает значение currentNum.
Хорошо, я изменил его так, что вместо вывода вывода он отправляет его в очередь. У меня очередь проверяет, что значение удовлетворяет условиям. Всегда ломается, потому что очередь заполнялась. В качестве альтернативы вы можете записать вывод в файл и запустить его пару тысяч раз, чтобы проверить, все ли они сломаны. У меня никогда не ломается. хотя и не является доказательством того, что он свободен от гонки, обратное показало бы гонку.
import java.util.function.IntFunction;
import java.util.concurrent.ArrayBlockingQueue;
public class FizzBuzzGameRunner {
static ArrayBlockingQueue<String> output = new ArrayBlockingQueue<>(100);
private int currentNum = 1;
public static void main(String... args) throws InterruptedException {
FizzBuzzGameRunner fizzBuzzGame = new FizzBuzzGameRunner();
startAll(
fizzBuzzGame.createRunnable(
n -> {
if (n % 3 != 0 && n % 5 != 0) {
return String.valueOf(n);
}
return null;
}),
fizzBuzzGame.createRunnable(
n -> {
if (n % 3 == 0 && n % 5 != 0) {
return "fizz";
}
return null;
}),
fizzBuzzGame.createRunnable(
n -> {
if (n % 3 != 0 && n % 5 == 0) {
return "buzz";
}
return null;
}),
fizzBuzzGame.createRunnable(
n -> {
if (n % 3 == 0 && n % 5 == 0) {
return "fizz buzz";
}
return null;
})
);
int n = 1;
String s;
while(true){
s = output.take();
if (n % 3 != 0 && n % 5 != 0) {
if (!String.valueOf(n).equals(s)){
break;
}
}
if (n % 3 == 0 && n % 5 != 0) {
if (!"fizz".equals(s)) break;
}
if (n % 3 != 0 && n % 5 == 0) {
if (!"buzz".equals(s)) break;
}
if (n % 3 == 0 && n % 5 == 0) {
if (!"fizz buzz".equals(s)) break;
}
n++;
}
System.out.println(s + " out of order after" + n);
}
private static void startAll(Runnable... workers) {
for (Runnable w : workers) {
Thread t = new Thread(w);
t.setDaemon(true);
t.start();
}
}
private Runnable createRunnable(IntFunction<String> singleStep) {
return () -> {
while (true) {
int currNum = this.currentNum;
// no synchronization
// even if other thread updates currentNum nothing bad can happen
String result = singleStep.apply(currNum);
if (result != null) {
//even without synchronization this block will be
//executed maximum by single thread simultaneously
try{
output.put(result);
} catch(InterruptedException e){
break;
}
this.currentNum++;
}
}
};
}
}
Если вы позволите своей версии запускаться ~ 100000 раз, направить вывод в файл и проверить вывод, я думаю, вы найдете несколько неудачных строк. Как вы думаете, почему проблема в output.offer?
Возникла проблема со строкой примера "output.offer (result)". предложение игнорирует элемент, если очередь заполнена. Я изменил "output.offer (result);" в "output.put (результат);" и все работает как положено (работает долго)
Итак, вы делаете какую-то спин-блокировку. Потоки могут пропустить чтение и ничего не делать, или они могут прочитать и обновить свое значение.
@matt Я думаю, что идея в том (хотя вопрос может быть связан с дополнительным объяснением ...), что только поток, который удовлетворяет его условию, будет увеличивать currNum. Поэтому каждый поток ничего не делает до тех пор, пока другие потоки не увеличат currNum до соответствующего значения (в каждом случае вызывая вывод соответствующей природы: «шипение», «жужжание», «шипение» или число). Это сложнее, чем кажется на первый взгляд.