Я пишу промежуточное программное обеспечение, ориентированное на сообщения, в котором подписчики и издатели общаются через очередь в MessageBroker среднего класса. издатели должны помещать сообщения в очередь темы, если она не заполнена, а подписчики должны получать сообщения темы, на которую подписаны, если очередь не пуста. Проблема теперь возникает, когда я пытаюсь включить wait() NotifyAll() в метод получения сообщения() подписчика. С Издателем работает без проблем, а вот с Подписчиком у меня проблема в том, что они не выходят из состояния ожидания, поэтому ничего не делают.
Метод издателя:
public synchronized void sendMessage() {
BlockingQueue<Message> topicQueue = mb.getQueue(topic);
if (topicQueue != null) {
try {
// Überprüfen, ob Platz in der Queue ist, bevor Nachricht gesendet wird
while (topicQueue.remainingCapacity() == 0) {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
Message m = topic.generateMessage();
mb.publish(m);
if (!gotActive) {
mb.increasePublisherCounter(1);
gotActive = true;
}
System.out.println(m.getContent() + " wurde zur Queue hinzugefügt");
notifyAll(); // Alle Threads benachrichtigen
}
}
Абонентский метод:
public synchronized void receiveMessage() {
for (Topic topic : topics) {
BlockingQueue<Message> queue = mb.getQueue(topic);
synchronized (queue) {
try {
// Warten, bis die Warteschlange nicht mehr leer ist
while (queue.isEmpty()) {
queue.wait(); // Warte auf Benachrichtigung, wenn die Warteschlange leer ist
}
// Nachricht aus der Warteschlange holen
Message message = queue.peek();
if (message != null) {
System.out.println(name + " hat Nachricht erhalten: " + message.getContent());
incrementProcessedCounter(topic);
if (topic.getSubscriberCount() == getProcessedCounter(topic)) {
queue.remove();
resetProcessedCounter(topic);
queue.notifyAll(); // Benachrichtige andere wartende Threads
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
что мне нужно изменить в этих методах, чтобы они работали? Спасибо за вашу помощь. Я также могу добавить больше кода, если вам нужно :)
Итак, wait() notifyAll() не требуется, если я использую BlockingQueue?
Вам понадобится либо wait()/notify() с блоком/методом синхронизации, либо BlockingQueue (без синхронизации и wait()/notify()), чтобы добиться синхронизации между потоками для проблемы производителя/потребителя. BlockingQueue внутренне блокирует/уведомляет потоки с помощью wait()/notify().
Не мог бы кто-нибудь из вас опубликовать решение в качестве ответа на вопрос, пожалуйста?




когда я пытаюсь включить wait() NotifyAll() в метод получения сообщения() подписчика. С Издателем работает без проблем, а вот с Подписчиком у меня проблема в том, что они не выходят из состояния ожидания, поэтому ничего не делают.
У меня есть ряд вопросов по поводу вашего кода, но первый комментарий заключается в том, что вам не нужно ждать/уведомлять себя. Одним из преимуществ BlockingQueue является то, что они полностью синхронизированы. Вызов queue.take() будет блокироваться до тех пор, пока не появится сообщение для использования, а queue.put() будет ждать, пока в очереди не появится место для его размещения.
public void sendMessage() {
BlockingQueue<Message> topicQueue = mb.getQueue(topic);
if (topicQueue != null) {
Message m = topic.generateMessage();
topicQueue.put(m);
// ... counters and the like?
}
}
и
public void receiveMessage() {
for (Topic topic : topics) {
BlockingQueue<Message> queue = mb.getQueue(topic);
Message message = queue.take();
// ... work with the message
}
}
Очень сложно сказать, почему ваш код не работает. Вот несколько комментариев:
topicQueue в свой sendMessage().peek() и remove() можно выполнить с помощью poll(), который выполняет обе эти задачи, возвращая null, если в очереди нет сообщений. Опять take()ждёт.InterruptedException. Вы должны, по крайней мере, сделать Thread.currentThread().interrupt(); в блоке catch, но вы также должны вернуться или выйти из цикла или что-то в этом роде.Надеюсь, что-то здесь поможет.
издатель должен синхронизироваться в очереди, а не в классе издателя. ПриемMessage также выглядит немного странно — он ждет, когда очередь пуста по одной теме, но не проверяет другие темы. При этом, если вы используете очередь блокировки, вам не нужно ждать вручную, поскольку очередь блокировки сделает это за вас.