Я попытаюсь кратко объяснить концепцию блокировки потока, которую я придумал, используя пример. Рассмотрим следующий пример программы.
public class Main {
public static void main(String[] args) {
Data data = new Data();
while (true) {
doStuff();
doStuff();
for (int i = 0; i < 256; i++) {
System.out.println("Data " + i + ": " + data.get(i));
}
doStuff();
doStuff();
for (int i = 0; i < 256; i++) {
data.set(i, (byte) (data.get(i) + 1));
}
doStuff();
doStuff();
}
}
public static void doStuff() {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Data {
private final byte[] data = new byte[256];
public byte get(int i) {
return data[i];
}
public void set(int i, byte data) {
this.data[i] = data;
}
}
Важно, чтобы модифицировался только основной поток data. Теперь я хочу сделать цикл, который печатает data асинхронно.
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Data data = new Data();
while (true) {
doStuff();
doStuff();
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 256; i++) {
System.out.println("Data " + i + ": " + data.get(i));
}
}
});
doStuff();
doStuff();
for (int i = 0; i < 256; i++) {
data.set(i, (byte) (data.get(i) + 1));
}
doStuff();
doStuff();
}
}
После отправки задачи в executorService основной поток теперь может продолжать работать по своему усмотрению. Проблема в том, что основной поток потенциально может достичь точки, в которой он изменяет data до того, как он будет напечатан, но состояние data должно быть напечатано при его отправке.
Я знаю, что в этом случае я могу создать копию data перед отправкой, которая будет напечатана, но на самом деле это не то, что я хочу делать. Имейте в виду, что это всего лишь пример, и копирование в реальном коде может оказаться дорогостоящей операцией.
Это решение, которое я придумал для этой проблемы.
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
Data data = new Data();
Lock lock = new Lock(); // <---------------
while (true) {
doStuff();
doStuff();
lock.lock(); // <---------------
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 256; i++) {
System.out.println("Data " + i + ": " + data.get(i));
}
lock.unlock(); // <---------------
}
});
doStuff();
doStuff();
lock.waitUntilUnlock(); // <---------------
for (int i = 0; i < 256; i++) {
data.set(i, (byte) (data.get(i) + 1));
}
doStuff();
doStuff();
}
}
public class Lock {
private final AtomicInteger lockCount = new AtomicInteger();
public void lock() {
lockCount.incrementAndGet();
}
public synchronized void unlock() {
lockCount.decrementAndGet();
notifyAll();
}
public synchronized void waitUntilUnlock() {
while (lockCount.get() > 0) {
try {
wait();
} catch (InterruptedException e) {
}
}
}
}
Теперь основной поток может перейти к работе над другими вещами после отправки data. По крайней мере, может, пока не достигнет точки, в которой он изменит data.
Вопрос: это хороший или плохой дизайн? Или есть лучшая (уже существующая) реализация этой проблемы?
Обратите внимание, что ReentrantLock в этом случае не работает. Я должен заблокировать перед отправкой в основном потоке и снять блокировку в потоке исполнителя.




Java имеет абстракции синхронизации более высокого уровня. В общем, вам действительно следует избегать функций wait() и notifyAll(), которые слишком низкоуровневы и сложны для правильного использования и чтения.
В этом случае вы можете просто использовать общую очередь блокировки (мне кажется подходящей синхронная очередь) между обоими потоками:
ExecutorService executorService = Executors.newSingleThreadExecutor();
Data data = new Data();
SynchronousQueue queue = new SynchronousQueue();
while (true) {
doStuff();
doStuff();
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 256; i++) {
System.out.println("Data " + i + ": " + data.get(i));
}
queue.put(data);
}
});
doStuff();
doStuff();
data = queue.take();
for (int i = 0; i < 256; i++) {
data.set(i, (byte) (data.get(i) + 1));
}
doStuff();
doStuff();
}
Да, поток печати может освободить разрешение, а основной поток заблокировать до тех пор, пока разрешение не будет доступно.
@ stonar96, Один из способов думать о Semaphore — это думать о нем как о блокирующей очереди безинформационных жетоны (также иногда называемой «разрешает»).
@JBNizet, этот ответ можно было бы несколько улучшить, переместив вызов new Data() в задачу, которая отправляется службе-исполнителю. Эта деталь может не иметь большого значения в таком тривиальном примере, как этот, но в более сложном приложении сделать объект data недоступным для основного потока до тех пор, пока основной поток не имеет смысла работать с ним, может быть просто вещь, чтобы помешать какому-нибудь младшему программисту «оптимизировать» код, позволив основному потоку начать выполнять свою работу до того, как data будет готов.
Да, я думал об этом, но я хотел сохранить ту же структуру кода, что и исходная, чтобы показать минимальное изменение. Поскольку runnable реализован как анонимный класс, выполнение того, что вы предлагаете, невозможно без рефакторинга кода.
Пока спасибо, думаю, понял. Но чего я не понимаю, так это того, как я могу использовать Queue или Semaphore, как показано в ответе, если должно быть несколько отправок одного и того же data в executorService? В этом случае #poll() возвращается после того, как первая задача готова, а остальные, вероятно, нет. Я решил это в своей реализации Lock путем подсчета.
@ stonar96 этот пост отвечает на ваш конкретный вопрос. Если у вас другой, то задайте другой вопрос. Ответ может быть похожим, или я могу быть другим: это зависит от проблемы разное, которую вы пытаетесь решить.
@JB Nizet Я заметил, что #poll() следует заменить на #take(). #poll() просто возвращает null, если в очереди нет элемента, однако #take() ожидает элемент из-за Javadocs.
@ stonar96 ты абсолютно прав. Я исправил пост. Спасибо.
@JB Nizet Спасибо, я думаю, что #put(E) тоже неправильно. Это не должно ждать, так что #offer(E) правильный выбор.
На самом деле, если цель состоит в том, чтобы не ждать, синхронную очередь вообще не следует использовать. LinkedBlockingQueue будет более подходящим. offer() ничего не сделает, если ни один поток не ожидает в случае SynchronousQueue (если только я неправильно интерпретирую javadoc). В любом случае, смысл ответа состоит в том, чтобы показать одно из возможных решений, используя абстракцию более высокого уровня. Может существовать несколько вариантов.
Кажется, что это достаточно хорошо покрывается базовой синхронизацией: doStuff ожидает, что у него будет единоличный доступ к данным, предположительно, чтобы он мог безопасно изменять данные в целом. Между тем, doPrint ожидает работы на стабильных данных. В этих двух случаях данные являются единицей состояния, доступ к которой контролируется, и соответствующий метод блокировки заключается в синхронизации экземпляра данных, к которому осуществляется доступ.
public void runningInThread1() {
Data someData = getData(); // Obtain the data which this thread is using
doStuff(someData); // Update the data
// Looping and such omitted.
}
public void runningInThread2() {
Data someData = getData();
doPrint(someData); // Display the data
}
public void doStuff(Data data) {
synchronized ( data ) {
// Do some stuff to the data
}
}
public void doPrint(Data data) {
synchronized ( data ) {
// Display the data
}
}
В качестве альтернативы, если doStuff и doPrint реализованы как методы экземпляра Data, то синхронизация может быть выполнена путем добавления ключевого слова synchronized к методам экземпляра.
Это недостаточно хорошо покрывается базовой синхронизацией. У этого все еще есть проблема, которую я описал в своем вопросе. Это не гарантирует правильный порядок печати и изменения.
Проблема в том, что когда data отправляется для печати асинхронно, код не находится внутри вашего синхронизированного блока, и пока он не будет внутри, data может быть изменен, чего я хочу избежать, поскольку я хочу напечатать состояние данных, которое было Отправлено.
Можете ли вы описать ожидаемую последовательность действий doPrint и doStuff и как запланированы эти действия? Например, есть ли у вас время от времени doStuff, который запускается внешним и, по сути, случайным событием, в то время как doPrint должен выполняться через запланированные интервалы времени?
Но похоже, что вы ищете что-то вроде Thread.join. См. docs.oracle.com/javase/tutorial/essential/concurrency/join.html. Но также есть много образцов, которые можно найти.
На мой взгляд, последовательность хорошо описана в примерах кодов вопроса. Основной поток в любое время отправляет data в executorService, а основной поток изменяет data в любое время. Теперь я хочу убедиться, что data не изменяется с момента, когда основной поток отправляет data, до тех пор, пока executorService не будет готов. Обратите внимание, что есть только один поток (основной), который отправляет и изменяет data. Проблема с базовой синхронизацией заключается в том, что когда основной поток отправляется и продолжает работать, не гарантируется, что синхронизированный doPrint() уже был вызван.
Вот почему я поместил lock.lock() в код своего примера перед отправкой в основной поток и lock.unlock() внутрь метода run() в потоке исполнителя. При базовой синхронизации было бы эквивалентно иметь как lock.lock(), так и lock.unlock() внутри метода run(). Thread#join() может хорошо сработать для этого, но, как сказано в других ответах, я должен использовать синхронизацию более высокого уровня, и мне это кажется довольно низким уровнем. Спасибо за ваши усилия в любом случае!
Тогда цель состоит в том, чтобы позволить основному потоку продолжать работу, в то время как doPrint продолжается асинхронно? (Эта активность представляет собой что-то помимо обновления данных через doStuff, иначе ничего не будет получено от использования doPrint в другом потоке.) В этом случае синхронизации все еще достаточно, за исключением того, что запрос на печать данных не будет обязательно распечатайте текущие данные, так как поток печати не будет гарантированно запущен до того, как основной поток внесет изменения в данные.
Тогда возникает вопрос, как обеспечить выполнение операции печати до следующего обновления данных.
Операция, которая кажется необходимой, — это «выдача всем потокам печати» (или просто «выдача всем потокам печати, если один из них запущен»), которая выполняется в начале вызова doStuff. Это будет реализовано с помощью Thread.join основного потока ко всем текущим потокам печати в качестве первого шага doStuff (или перед каждым вызовом doStuff). Блокировки или фьючерсы кажутся ненужными и чрезмерно сложными.
Да, цель состоит в том, чтобы основной поток мог продолжать работу, в то время как doPrint продолжает работать асинхронно. Основной поток может продолжать работу до тех пор, пока не достигнет точки изменения data. > В этом случае синхронизации по-прежнему достаточно, за исключением того, что запрос на печать данных не обязательно будет печатать текущие данные, поскольку поток печати не будет гарантированно запущен до того, как основной поток внесет изменения в данные. < Я именно об этом и говорю. Он должен печатать текущие данные. Теперь вопрос в том, как мне вызвать join(), если я использую ExecutorService?
Вы хотите, чтобы эта вещь Data была построена асинхронно, и основной поток хочет иметь возможность перейти к определенной точке, а затем должен получить завершенный объект. Для этого и нужны фьючерсы, они дают вам ссылку на вычисление, которое, возможно, еще не завершено.
Перепишите асинхронную часть как Callable, чтобы в результате она возвращала Data.
Callable<Integer> task = () -> {
Data data = new Data();
for (int i = 0; i < 256; i++) {
System.out.println("Data " + i + ": " + data.get(i));
}
return data;
};
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Data> future = executor.submit(task);
doStuff();
// ... main thread goes about its business
// when you get to a point where you need data,
// you can block here until the computation is done
Data data = future.get();
Таким образом, Future обеспечивает видимость объекта Data во всех потоках.
Спасибо! У меня была почти такая же идея, когда вы писали ответ (см. Мой ответ). Что вы думаете о моем дополнительном Queue для поддержки нескольких заявок?
@ stonar96: с очередью все в порядке, все зависит от того, что вы делаете. он следует совету использовать абстракции более высокого уровня.
Я нашел альтернативный подход к проблеме, который также отвечает на мой комментарий из ответа @JB Nizet. ExecutorService#submit(Runnable) возвращает Future<?>, который можно использовать для ожидания готовности задачи. Если должно быть возможно несколько представлений, можно просто создать Queue<Future<?>> queue , всегда предлагать Future<?>, возвращенное ExecutorService#submit(Runnable), queue и в точке, где основной поток должен ждать только #poll().get() весь queue.
Обновлено: я также нашел соответствующий ответ здесь: https://stackoverflow.com/a/20496115/3882565
Спасибо, мне очень нравится этот ответ. Я также прочитал о
Semaphoreи теперь думаю, что смогу написать свой код, используя их. Что вы о них думаете? Они все еще слишком низкоуровневые?