У меня есть микросервис Spring Boot, который принимает сообщения от RabbitMQ, составляет электронные письма и отправляет их на SMTP-сервер. Он состоит из следующих компонентов:
@Service
@RequiredArgsConstructor
public class MessageSender {
private final JavaMailSender sender;
public void sendMessage(final RabbitEmailDto emailDto) {
MimeMessage message = sender.createMimeMessage();
message.setRecipients(Message.RecipientType.TO, emailDto.getTo());
MimeMessageHelper helper = new MimeMessageHelper(message, CharEncoding.UTF_8);
helper.setSubject(emailDto.getData().getEmail().getSubject());
helper.setText(emailDto.getHtml(), true);
helper.setFrom(emailDto.getFrom());
sender.send(message);
}
}
@Service
@RequiredArgsConstructor
public class MessageProcessor {
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
private final MessageSender messageSender;
private final Jackson2JsonMessageConverter converter;
public List<MessageProcessingFuture> processMessages(List<Message> messages) {
List<MessageProcessingFuture> processingFutures = new ArrayList<>();
for (Message message : messages) {
MessageProcessingFuture messageProcessingFuture = new MessageProcessingFuture(
message.getMessageProperties().getDeliveryTag(),
processMessageAsync(message, executor)
);
processingFutures.add(messageProcessingFuture);
}
return processingFutures;
}
private Future<?> processMessageAsync(final Message message) {
RabbitEmailDto rabbitEmailDto = (RabbitEmailDto) converter.fromMessage(message);
MessageDeliverySystemEmailDto email = rabbitEmailDto.getData().getEmail();
return executor.submit(() -> messageSender.sendMessage(rabbitEmailDto));
}
}
@Component
@RequiredArgsConstructor
public class BatchMessageListener implements ChannelAwareBatchMessageListener {
private final MessageProcessor messageProcessor;
@Override
@MeasureExecutionTime
public void onMessageBatch(final List<Message> messages, final Channel channel) {
messageProcessor.processMessages(messages)
.forEach(processingFuture -> processFuture(processingFuture, channel));
}
private void processFuture(final MessageProcessingFuture future, final Channel channel) {
try {
future.deliveryFuture().get();
channel.basicAck(future.deliveryTag(), false);
} catch (Exception e) {
channel.basicReject(future.deliveryTag(), false);
}
}
}
В журналах я вижу, что метод MessageSender.sendMessage действительно выполняется в виртуальном потоке, обозначенном как VirtualThread[#100]/runnable@ForkJoinPool-1-worker-1.
И я вижу, что у меня есть 4 таких рабочих на нашем рабочем сервере. (Правильно ли я, что эти рабочие потоки являются реальными потоками платформы или потоками-носителями?)
Я также вижу, что выполнение метода MessageSender.sendMessage обычно занимает около 1 секунды, причем большая часть этого времени тратится на ожидание ответа от SMTP-сервера.
Основываясь на том, что я узнал о виртуальных потоках, я ожидал, что обработка пакета из 100 сообщений (это мой настроенный размер пакета для BatchMessageListener) займет около 1 секунды, поскольку потоки платформы не будут блокироваться при вызовах SMTP-сервера. . И эти 4 потока платформы будут совместно использоваться 100 виртуальными потоками, что фактически позволит осуществлять 100 практически одновременных вызовов SMTP-сервера.
Однако на практике я заметил, что сообщения обрабатываются по 4 за раз, а обработка всех 100 сообщений занимает около 25 секунд.
Во время локального тестирования на моем компьютере я намеренно ввел задержку в 1 секунду, добавив Thread.sleep(1000); перед строкой sender.send(message); в MessageSender, чтобы имитировать задержку сети. И тогда пакет из 100 сообщений действительно был обработан буквально за 1 секунду, несмотря на то, что по логам у меня было всего 10 потоков несущей.
Я озадачен. Почему потоки оператора связи не блокируются при вызове Thead.sleep, а блокируются при вызове внешнего сервиса? Я делаю что-то не так?
В конечном итоге вы вызываете Thread.sleep() 100 раз одновременно в одном и том же потоке. Заставляя его эффективно спать намного короче, чем 100 секунд. Это всего лишь предположение.
Задержка вашей сети составляет 1 секунду? Тогда вам действительно следует сосредоточиться на исправлении вашей сети.
@BasilBourque много танкует. Я вижу, что org.springframework.mail.javamail.JavaMailSender, который я использую, вызывает несколько синхронизированных методов при отправке электронного письма. Правильно ли я понимаю, что в этом случае мало что можно сделать, чтобы извлечь выгоду из виртуальных гусениц?
@AlexKolokolov, можете ли вы рассказать, какие синхронизированные методы вызываются внутри org.springframework.mail.javamail.JavaMailSender? и обновите вопрос, указав соответствующую ему трассировку стека закрепленных тем (-Djdk.tracePinnedThreads).
@Naman метод JavaMailSender.send вызывает синхронизированные методы protocolConnect, getAuthorizationId и sendMessage класса org.eclipse.angus.mail.smtp.SMTPTransport под капотом. Извините, я не могу поделиться фактическими трассировками стека, поскольку работаю над проприетарным программным обеспечением, на которое распространяется соглашение о неразглашении.
Да, я видел несколько методов реализации, которые должны быть синхронизированы с помощью Javadoc, просто хотел подтвердить те из них, которые вы могли наблюдать в трассировке.
Вы написали "Почему потоки-носители не блокируют вызов Thead.sleep, а блокируют вызов внешнего сервиса?" Вы имели в виду булавку на Thread.sleep? Если какой-либо поток, виртуальный или платформенный, не блокируется Thread.sleep, другими словами, ему разрешено работать без приостановки в течение определенного времени, то это гигантская ошибка в реализации JVM. Закрепление и блокировка — совершенно разные явления.
Чтобы обнаружить закрепление, вам следует запускать JFR в исполняемом коде, а не пристально следить за исходным кодом. Программисты всех уровней квалификации, как известно, плохо умеют предсказывать проблемы с производительностью во время выполнения.
@igor.zh Thread.sleep заблокировал виртуальные темы. В моем случае все 100 виртуальных потоков были заблокированы на 1 секунду и освобождены одновременно. И такого поведения я ожидал от виртуальных потоков. Я не заметил никакого закрепления следующего вызова sender.send, поскольку локально он длился всего несколько миллисекунд. Я не использовал термин «закрепление» в своем вопросе, потому что на момент публикации я ничего не знал о закреплении.
@BasilBourque Я обнаружил закрепление с помощью tracePinnedThreads. Я заглянул в исходный код библиотеки, чтобы убедиться, что большая часть логики метода JavaMailSender.send выполняется внутри метода synchronised.
Хорошо, приятно слышать, что Thread.sleep по-прежнему работает так, как ожидалось :) Если это действительно закрепление, не могли бы вы отредактировать свой вопрос и упомянуть закрепление вместо блокировки, чтобы можно было дать более точный ответ? Спасибо за ответ!
Закрепление @igor.zh - это ключевой ответ на мой вопрос, его не может быть в самом вопросе. Если бы я знал о закреплении раньше, не было бы и вопросов. Поэтому я не буду редактировать вопрос, чтобы позволить другим, кто еще не знает о закреплении, встретить вопрос и узнать о закреплении из ответа.
@igor.zh вообще-то формулировка вопроса правильная. Термин «блокировка» используется в отношении потока-носителя, что происходит потому, что виртуальный поток закреплен. С точки зрения потока-носителя здесь не происходит ничего особенного. Он заблокирован так же, как могут быть заблокированы другие потоки платформы.
@Хольгер, да, если закрепление может произойти только тогда, когда поток-носитель блокируется, как предлагает Oracle. Однако обратите внимание, что изначально я столкнулся с утверждением «потоки-носители не блокируются в Thread.sleep call», что практически невозможно, но имело бы смысл, если бы блокировку заменили закреплением.
@igor.zh все наоборот: поток-носитель может быть заблокирован, когда виртуальный поток закреплен (и выполняет блокирующее действие, пока закреплен). Закрепление означает только то, что виртуальный поток не может отключиться от своего несущего потока. Этот термин не является заменой блокировки. Таким образом, исходное утверждение действительно, если вы помните, что виртуальный поток выполняет вызов sleep: «поток-носитель (обычно) не блокирует вызов sleep виртуального потока», но учтите, что виртуальный поток может быть уже закреплен , например при вызове sleep внутри блока synchronized
@Holger, как может такое случиться, что поток-носитель (обычно) не блокирует (!) вызов сна виртуального потока». Thread.sleep должен заблокировать любую тему, разве это не единственная его цель? В его javadoc говорится: «Приводит... поток... временно прекращает выполнение». Я неправильно прочитал javadoc или у нас разное понимание блокировки потоков? Блокировка для меня это LockSupport.park, вход в раздел synchronized на конкурирующем мониторе, join и конечно sleep. Ожидание вращения и CAS, например. в AQS неблокируются. Что мне здесь не хватает?
@igor.zh, когда виртуальный поток выполняет вызов sleep, виртуальный поток блокируется. Поток-носитель не блокируется, но продолжит выполнение другого виртуального потока. В этом вся суть виртуальных потоков. Так обычно и происходит. Но когда виртуальный поток закрепляется при выполнении вызова sleep, поток несущей также блокируется. Вот что значит pinned, невозможность отмонтировать, поэтому несущий поток не может сделать что-то еще.
@Хольгер, да, это имеет смысл. Оператор связи, когда он спотыкается об операции блокировки, просто отправляется в другой виртуальный поток или возвращается в ForkJoinPool — в идеале. В этом смысле и виртуальная, и платформа блокируются на sleep, а оператор связи (опять же платформа :) ) — нет. Вопрос: когда вы сказали, что «поток-носитель (обычно) не блокируется в виртуальном потоке», вы обычно имели в виду, что когда он блокируется, это является причиной (или веской причиной) для закрепления виртуального потока, установлен на этом носителе?
Причина @igor.zh, по которой поток-носитель блокируется, когда виртуальный поток выполняет вызов sleep, может заключаться в том, что виртуальный поток закреплен или вы используете пользовательскую реализацию jre, где поддержка виртуального потока для sleep не реализована ( еще).




Похоже на закрепление, когда виртуальный поток не отключается от несущего потока.
Закрепление обычно происходит из-за:
synchronized в долго выполняющемся коде.Если у вас есть четыре потока-носителя (потоки платформы, предназначенные для обслуживания виртуальных потоков), и эти виртуальные потоки закреплены большую часть времени, то вы фактически ограничили пропускную способность работы этими четырьмя потоками-носителями. Оставшиеся 96 задач из 100 должны дождаться завершения первых четырех, затем 92 — завершения следующих четырех и так далее. В таком случае вам следует использовать потоки платформы, а не виртуальные потоки. Виртуальные потоки не приносят никакой пользы и фактически создают дополнительную работу.
См. статью Закрепление виртуального потока Java Тодда Гинзберга, где вы найдете руководство о том, как обнаружить закрепление. Он описывает, как обнаружить закрепление:
Вы можете просмотреть результаты JFR с помощью JDK Mission Control (JMC).
Обратите внимание, что вы можете настроить порог обнаружения закрепления. Я смутно припоминаю, что значение по умолчанию составляет 20 миллисекунд. Но вам следует проверить и определить свою собственную полезную ценность.
Вы позже прокомментировали:
Я вижу, что org.springframework.mail.javamail.JavaMailSender, который я использую, вызывает несколько синхронизированных методов при отправке электронного письма. Правильно ли я понимаю, что в этом случае мало что можно сделать, чтобы извлечь выгоду из виртуальных гусениц?
Правильно… если этот код synchronized работает долго.
Виртуальные потоки не подходят для задач, требующих длительного выполнения кода, который является либо synchronized, либо собственным (JNI и т. д.). В обоих случаях планировщик виртуальных потоков JVM не может определить, когда такой код заблокирован, поэтому он остается назначенным потоку-носителю платформы.
Для случайных встреч эта ситуация не имеет большого значения. Но для повторяющихся или продолжительных столкновений эта ситуация означает, что вы не получите никакой пользы от виртуальных потоков, а виртуальные потоки могут фактически ухудшить общую производительность. Похоже, ваш случай соответствует категории «повторяющиеся или продолжительные встречи».
Команда Project Loom продолжает искать способы обойти synchronized ограничение. Но все еще проблема с Java 21 и 22.
synchronized на ReentrantLockЕсли код synchronized принадлежит вам, замените любое длительное использование synchronized на ReentrantLock, чтобы восстановить эффективность с помощью виртуальных потоков. Это изменение достаточно простое.
Некоторые люди неверно истолковали это руководство как «замените все использование synchronized на ReentrantLock». В этом нет необходимости. Модифицировать нужно только долго работающий код synchronized. Краткий код, например, для защиты доступа к обычно доступной переменной, можно оставить на месте, поскольку на код synchronized тратится мало времени.
Если вы не можете изменить исходный код долго выполняющихся synchronized задач, используйте потоки платформы, а не виртуальные потоки. Но имейте в виду, что если эти задачи имеют несколько подзадач, которые могут быть многопоточными, задача в потоке платформы может выиграть от выполнения этих подзадач с виртуальными потоками.
Я не эксперт в этом. Так что слушайте экспертов, а не меня.
Похоже на закрепление, когда виртуальный поток не отключается от несущего потока. См. статью Закрепление виртуального потока Java Тодда Гинзберга, где вы найдете руководство о том, как обнаружить закрепление. Он описывает, как обнаружить закрепление с помощью (1) ведения журнала и (2) использования Java Flight Recorder (JFR). Закрепление обычно происходит из-за (а) использования
synchronizedвокруг долго выполняющегося кода или (б) вызова собственного кода через JNI и т. д.