Меня попросили написать Java-метод выполненияInParallel, который выполняется без активного ожидания и без условий гонки (поэтому я подумал, что мне нужен ArrayBlockingQueue). Меня также попросили не использовать другие классы, кроме тех, которые уже есть в моем коде.
import java.util.concurrent.ArrayBlockingQueue;
public class ParallelExecutor {
public static void executeInParallel(Runnable[] runnables, int K) {
if (K <= 0)
throw new IllegalArgumentException("Can't execute zero or less concurrent runnables");
if (runnables == null || runnables.length == 0)
throw new IllegalArgumentException("Must provide something to run");
ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(K); // Extra space for end markers
// Start the producer
Thread producer = new Thread(() -> {
try {
for (Runnable r : runnables) {
taskQueue.put(r); // Put a runnable in the queue
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
// Start the consumer threads
K=Math.min(runnables.length,K);
Thread[] consumers = new Thread[K];
for (int i = 0; i < K; i++) {
consumers[i] = new Thread(() -> {
try {
Runnable r;
while((r= taskQueue.take())!=null){
r.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
e.printStackTrace(); // Handle exceptions as needed
}
});
consumers[i].start();
}
// Wait for the producer to finish
try {
producer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Wait for all consumers to finish
for (Thread consumer : consumers) {
try {
consumer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
// Example usage
Runnable[] tasks = new Runnable[20];
for (int i = 0; i < tasks.length; i++) {
final int index = i;
tasks[i] = () -> {
System.out.println("Task " + index + " is running.");
try {
Thread.sleep(1000); // Simulate work that takes 1 second
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task " + index + " is finished.");
};
}
executeInParallel(tasks, 3); // Execute tasks in parallel, 3 at a time
}
}
и код в этой части зацикливается после выполнения всех r.run:
while((r= taskQueue.take())!=null){
r.run();
}
Как я могу удалить цикл без добавления активного ожидания (мне нужно пассивное ожидание, и меня попросили не добавлять другие классы, такие как ExecutorService и т. д.)?
Я попытался удалить while с чем-то другим, но меня попросили запустить k параллельно, поэтому я не могу сделать это без дополнительных потоков, я думаю
но не лучше ли использовать потокобезопасные методы ArrayBlockingQueue? мне не нужны условия гонки в моем коде
Попробуйте создать простой фрагмент кода, который разбивает массив из 20 элементов на блоки по три. Я думаю, вам не понадобится цикл while.
я хочу сохранить правильность того, что когда один r.run заканчивается, сразу же происходит другой r.run, как в моем исходном коде, единственная проблема на данный момент - это цикл
Почему ты звонишь run() вместо start? Или даже consumers[i] = new Thread(taskQueue.take());




В потребителях вызов taskQueue.take() возвращается только тогда, когда доступен новый рабочий элемент.
Производитель производит ограниченное количество рабочих элементов, а затем останавливается. Потребители, однако, никогда не замечают, что производитель больше не активен, и покорно ждут новых рабочих элементов, никогда не заканчивая ожидание (и, следовательно, никогда не заканчивающееся).
Вы можете решить эту проблему, предоставив потребителям максимальное время ожидания, например, используя taskQueue.poll(10, TimeUnit.SECONDS).
taskQueue.take() может ждать вечно, пока новая задача не будет помещена в TaskQueue, вот исходный код:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// continue waiting
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
поэтому рассмотрите возможность использования taskQueue.poll(0,TimeUnit.SECONDS), чтобы он мог немедленно вернуть задачу или ноль.
Вы можете использовать wait и notify, чтобы сигнализировать об обработке или прерывании потока. Для получения более подробной информации обратитесь к примеру межпоточного взаимодействия по адресу javatpoint.com/inter-thread-communication-example.