Wait() и NotifyAll() не работают в программе

Я пишу промежуточное программное обеспечение, ориентированное на сообщения, в котором подписчики и издатели общаются через очередь в MessageBroker среднего класса. издатели должны помещать сообщения в очередь темы, если она не заполнена, а подписчики должны получать сообщения темы, на которую подписаны, если очередь не пуста. Проблема теперь возникает, когда я пытаюсь включить wait() NotifyAll() в метод получения сообщения() подписчика. С Издателем работает без проблем, а вот с Подписчиком у меня проблема в том, что они не выходят из состояния ожидания, поэтому ничего не делают.

Метод издателя:

public synchronized void sendMessage() {
        BlockingQueue<Message> topicQueue = mb.getQueue(topic);
        if (topicQueue != null) {
            try {
                // Überprüfen, ob Platz in der Queue ist, bevor Nachricht gesendet wird
                while (topicQueue.remainingCapacity() == 0) {
                    wait();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            Message m = topic.generateMessage();
            mb.publish(m);
            if (!gotActive) {
                mb.increasePublisherCounter(1);
                gotActive = true;
            }
            System.out.println(m.getContent() + " wurde zur Queue hinzugefügt");
            notifyAll(); // Alle Threads benachrichtigen
        }
    }

Абонентский метод:

public synchronized void receiveMessage() {
        for (Topic topic : topics) {
            BlockingQueue<Message> queue = mb.getQueue(topic);
            synchronized (queue) {
                try {
                    // Warten, bis die Warteschlange nicht mehr leer ist
                    while (queue.isEmpty()) {
                        queue.wait(); // Warte auf Benachrichtigung, wenn die Warteschlange leer ist
                    }

                    // Nachricht aus der Warteschlange holen
                    Message message = queue.peek();
                    if (message != null) {
                        System.out.println(name + " hat Nachricht erhalten: " + message.getContent());
                        incrementProcessedCounter(topic);
                        if (topic.getSubscriberCount() == getProcessedCounter(topic)) {
                            queue.remove();
                            resetProcessedCounter(topic);
                            queue.notifyAll(); // Benachrichtige andere wartende Threads
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

что мне нужно изменить в этих методах, чтобы они работали? Спасибо за вашу помощь. Я также могу добавить больше кода, если вам нужно :)

издатель должен синхронизироваться в очереди, а не в классе издателя. ПриемMessage также выглядит немного странно — он ждет, когда очередь пуста по одной теме, но не проверяет другие темы. При этом, если вы используете очередь блокировки, вам не нужно ждать вручную, поскольку очередь блокировки сделает это за вас.

mlecz 25.05.2024 11:27

Итак, wait() notifyAll() не требуется, если я использую BlockingQueue?

LennartPfeiler 25.05.2024 11:32

Вам понадобится либо wait()/notify() с блоком/методом синхронизации, либо BlockingQueue (без синхронизации и wait()/notify()), чтобы добиться синхронизации между потоками для проблемы производителя/потребителя. BlockingQueue внутренне блокирует/уведомляет потоки с помощью wait()/notify().

CyberMafia 25.05.2024 13:28

Не мог бы кто-нибудь из вас опубликовать решение в качестве ответа на вопрос, пожалуйста?

Anonymous 25.05.2024 17:31
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
4
54
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

когда я пытаюсь включить wait() NotifyAll() в метод получения сообщения() подписчика. С Издателем работает без проблем, а вот с Подписчиком у меня проблема в том, что они не выходят из состояния ожидания, поэтому ничего не делают.

У меня есть ряд вопросов по поводу вашего кода, но первый комментарий заключается в том, что вам не нужно ждать/уведомлять себя. Одним из преимуществ BlockingQueue является то, что они полностью синхронизированы. Вызов queue.take() будет блокироваться до тех пор, пока не появится сообщение для использования, а queue.put() будет ждать, пока в очереди не появится место для его размещения.

public void sendMessage() {
    BlockingQueue<Message> topicQueue = mb.getQueue(topic);
    if (topicQueue != null) {
        Message m = topic.generateMessage();
        topicQueue.put(m);
        // ... counters and the like?
    }
}

и

public void receiveMessage() {
    for (Topic topic : topics) {
        BlockingQueue<Message> queue = mb.getQueue(topic);
        Message message = queue.take();
        // ... work with the message
     }
}

Очень сложно сказать, почему ваш код не работает. Вот несколько комментариев:

  • Я не вижу, чтобы вы помещали сообщение в topicQueue в свой sendMessage().
  • Будьте очень осторожны с тем, что вы синхронизируете. И издатель, и подписчик должны синхронизироваться с одним и тем же экземпляром объекта, если один ожидает уведомления от другого.
  • Будьте осторожны со счетчиком и активным логическим значением, чтобы они были правильно синхронизированы, если они записываются/читаются разными потоками.
  • peek() и remove() можно выполнить с помощью poll(), который выполняет обе эти задачи, возвращая null, если в очереди нет сообщений. Опять take()ждёт.
  • Используйте хорошие шаблоны с InterruptedException. Вы должны, по крайней мере, сделать Thread.currentThread().interrupt(); в блоке catch, но вы также должны вернуться или выйти из цикла или что-то в этом роде.

Надеюсь, что-то здесь поможет.

Другие вопросы по теме