Как правильно останавливать и перезапускать потоки, находясь внутри потока?

Я запускаю приложение SpringBoot с бизнес-службой, которая постоянно запускает метод, который заполняет глобальную очередь своими сообщениями. Внутри этого метода, поскольку он запускается более 15-30 раз в секунду, метод проверяет глобальный массив логических значений, чтобы убедиться, что поток с определенным индексом (0-7) не запущен. Если поток не запущен, метод запускает поток. Каждый поток отвечает за чтение из глобальной очереди сообщений и выполнение задачи над этим сообщением. Однако я заметил, что по мере того, как приложение работает дольше, я вижу только один из этих отдельных потоков, выполняющих операцию, в то время как другие зависают.

Мой вопрос, если у меня есть:

public static void onMessage(String record) {
     global.add(record);
     if (threads[0] == false) {
     threads[0] = true;
     thread0.start() // Name of the thread index included in runnable
> }

В основном вышеперечисленное постоянно срабатывает, заполняет очередь, пытается запустить поток и все.

Тогда у меня есть:

public void run() { 
    String recordToUse;
    int thread_num = Integer.parseInt(Thread.currentThread().getName());
    long startThreadTime = System.currentTimeMillis();
    long endThreadTime = startThreadTime + 60 * 1000; // Run the thread for 1 minute.
                
    while(System.currentTimeMillis() < endThreadTime) {
        if (!global.isEmpty()) {
            recordToUse = global.remove();
            System.out.println("Successful removal: Thread-"+ thread_num);
        } else {
            continue; // If the queue is empty, keep checking until it is not empty. 
        }
        // Then we have more operations that work on the message
    }
    threads[thread_num] = false; // Mark that this thread is now finished and the onMessage method knows it can start another thread.
    return;
}
           

Реализация runnable принимает число в качестве аргумента, чтобы просто сохранить номер потока внутри потока, поэтому у нас есть что-то вроде Thread thread1 = new Thread(runnable,"0"); чтобы указанный выше поток знал, что это индекс номер 0.

После того, как приложение работает по назначению, я вижу, что сначала из 8 намеченных потоков сначала оно работает нормально и отображает сообщения вроде:

Successful removal: Thread-2
Successful removal: Thread-0
Successful removal: Thread-5
Successful removal: Thread-7
Successful removal: Thread-4
Successful removal: Thread-1

Но примерно через 5-10 минут он фокусируется на одном потоке. Я вижу только сообщения типа:

Successful removal: Thread-6
Successful removal: Thread-6
Successful removal: Thread-6
Successful removal: Thread-6
Successful removal: Thread-6
Successful removal: Thread-6

Я не совсем уверен, что происходит, и некоторые указания были бы замечательными! Моя идея заключается в том, что поток обращается к синхронизированному методу, а все остальные потоки ждут его, но я не уверен, что это так, поскольку все они изначально работают.

Спасибо.

Я пытался засыпать потоки здесь и там, я играл с продолжением потока, но я не вижу, где эти потоки застряли.

Это кажется очень сложным, поэтому я должен спросить: почему вы хотите сами управлять потоками? Не могли бы вы просто использовать пул потоков и оставить обработку Spring/Java?

grekier 10.11.2022 18:46

Похоже, вы зря изобретаете ExecutorService.

Basil Bourque 10.11.2022 19:31

@BasilBourque Будет ли работать ExecutorService в этом сценарии? В некотором смысле, onMessage ведет себя здесь как основной метод, он выполняется многократно. Я хочу заполнить очередь сообщениями, которые она получает, но я не хочу, чтобы onMessage создавал все больше и больше потоков по мере выполнения. Общая идея состоит в том, чтобы поддерживать потоки активными для более чем одного сообщения в очереди.

Filip Klosiewicz 10.11.2022 19:54
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
1
3
54
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий
  1. Создайте параллельную блокирующую очередь. (См. java.util.concurrent.BlockingQueue.)
  2. Продолжайте добавлять свои элементы в эту очередь, совершенно не беспокоясь о том, кто их читает, когда они их читают, если они их читают.
  3. Создайте N потоков, где каждый поток просто работает, не давая никому знать, занят он или нет.
  4. Каждый поток запускает цикл, который считывает следующий элемент из очереди и обрабатывает его. Чтобы прочитать следующий элемент из очереди, используйте метод блокировки из очереди, чтобы поток был заблокирован в ожидании, пока элемент не станет доступным.
  5. Если вы хотите, чтобы потоки прекратили работу, поставьте в очередь N экземпляров специального сигнального значения, которое означает «пожалуйста, закройте». Каждая очередь проверяет это значение и завершает работу.
  6. Вы сделали.

Это очень полезно, спасибо! Однако меня интересует один сценарий. Когда два потока попадают в точку удаления из очереди блокировки одного элемента, один поток будет успешно удален из очереди — как я могу предотвратить создание исключения «Пустой стек/очередь», когда второй поток продолжает извлекать из очереди блокировки после первая отделка? @МайкНакис

Filip Klosiewicz 24.11.2022 05:09

@FilipKlosiewicz Позаботиться именно об этом — часть смысла существования BlockingQueue. Когда 5 потоков заблокированы, ожидая удаления элемента из очереди, и элемент поступает, один из них получит элемент. Остальные 4 останутся ждать. Они никогда не узнают, что элемент был поставлен в очередь и был выбран другим потоком.

Mike Nakis 24.11.2022 07:40

На interwebz есть много примеров, этот, пожалуй, один из лучших: mkyong.com/java/java-blockingqueue-examples

Mike Nakis 24.11.2022 07:45

Другие вопросы по теме