Я запускаю приложение SpringBoot с бизнес-службой, которая постоянно запускает метод, который заполняет глобальную очередь своими сообщениями. Внутри этого метода, поскольку он запускается более 15-30 раз в секунду, метод проверяет глобальный массив логических значений, чтобы убедиться, что поток с определенным индексом (0-7) не запущен. Если поток не запущен, метод запускает поток. Каждый поток отвечает за чтение из глобальной очереди сообщений и выполнение задачи над этим сообщением. Однако я заметил, что по мере того, как приложение работает дольше, я вижу только один из этих отдельных потоков, выполняющих операцию, в то время как другие зависают.
Мой вопрос, если у меня есть:
public static void onMessage(String record) {
global.add(record);
if (threads[0] == false) {
threads[0] = true;
thread0.start() // Name of the thread index included in runnable
> }
В основном вышеперечисленное постоянно срабатывает, заполняет очередь, пытается запустить поток и все.
Тогда у меня есть:
public void run() {
String recordToUse;
int thread_num = Integer.parseInt(Thread.currentThread().getName());
long startThreadTime = System.currentTimeMillis();
long endThreadTime = startThreadTime + 60 * 1000; // Run the thread for 1 minute.
while(System.currentTimeMillis() < endThreadTime) {
if (!global.isEmpty()) {
recordToUse = global.remove();
System.out.println("Successful removal: Thread-"+ thread_num);
} else {
continue; // If the queue is empty, keep checking until it is not empty.
}
// Then we have more operations that work on the message
}
threads[thread_num] = false; // Mark that this thread is now finished and the onMessage method knows it can start another thread.
return;
}
Реализация runnable принимает число в качестве аргумента, чтобы просто сохранить номер потока внутри потока, поэтому у нас есть что-то вроде Thread thread1 = new Thread(runnable,"0"); чтобы указанный выше поток знал, что это индекс номер 0.
После того, как приложение работает по назначению, я вижу, что сначала из 8 намеченных потоков сначала оно работает нормально и отображает сообщения вроде:
Successful removal: Thread-2
Successful removal: Thread-0
Successful removal: Thread-5
Successful removal: Thread-7
Successful removal: Thread-4
Successful removal: Thread-1
Но примерно через 5-10 минут он фокусируется на одном потоке. Я вижу только сообщения типа:
Successful removal: Thread-6
Successful removal: Thread-6
Successful removal: Thread-6
Successful removal: Thread-6
Successful removal: Thread-6
Successful removal: Thread-6
Я не совсем уверен, что происходит, и некоторые указания были бы замечательными! Моя идея заключается в том, что поток обращается к синхронизированному методу, а все остальные потоки ждут его, но я не уверен, что это так, поскольку все они изначально работают.
Спасибо.
Я пытался засыпать потоки здесь и там, я играл с продолжением потока, но я не вижу, где эти потоки застряли.
Похоже, вы зря изобретаете ExecutorService
.
@BasilBourque Будет ли работать ExecutorService в этом сценарии? В некотором смысле, onMessage ведет себя здесь как основной метод, он выполняется многократно. Я хочу заполнить очередь сообщениями, которые она получает, но я не хочу, чтобы onMessage создавал все больше и больше потоков по мере выполнения. Общая идея состоит в том, чтобы поддерживать потоки активными для более чем одного сообщения в очереди.
java.util.concurrent.BlockingQueue
.)Это очень полезно, спасибо! Однако меня интересует один сценарий. Когда два потока попадают в точку удаления из очереди блокировки одного элемента, один поток будет успешно удален из очереди — как я могу предотвратить создание исключения «Пустой стек/очередь», когда второй поток продолжает извлекать из очереди блокировки после первая отделка? @МайкНакис
@FilipKlosiewicz Позаботиться именно об этом — часть смысла существования BlockingQueue. Когда 5 потоков заблокированы, ожидая удаления элемента из очереди, и элемент поступает, один из них получит элемент. Остальные 4 останутся ждать. Они никогда не узнают, что элемент был поставлен в очередь и был выбран другим потоком.
На interwebz есть много примеров, этот, пожалуй, один из лучших: mkyong.com/java/java-blockingqueue-examples
Это кажется очень сложным, поэтому я должен спросить: почему вы хотите сами управлять потоками? Не могли бы вы просто использовать пул потоков и оставить обработку Spring/Java?