Кажется, что виртуальные потоки блокируют потоки несущей при вызове внешней службы

У меня есть микросервис Spring Boot, который принимает сообщения от RabbitMQ, составляет электронные письма и отправляет их на SMTP-сервер. Он состоит из следующих компонентов:

  1. Отправитель электронной почты, составляющий электронное письмо, отправляет его на SMTP-сервер:
@Service
@RequiredArgsConstructor
public class MessageSender {

    private final JavaMailSender sender;

    public void sendMessage(final RabbitEmailDto emailDto) {
        MimeMessage message = sender.createMimeMessage();
        message.setRecipients(Message.RecipientType.TO, emailDto.getTo());

        MimeMessageHelper helper = new MimeMessageHelper(message, CharEncoding.UTF_8);
        helper.setSubject(emailDto.getData().getEmail().getSubject());
        helper.setText(emailDto.getHtml(), true);
        helper.setFrom(emailDto.getFrom());

        sender.send(message);
    }
}
  1. Процессор сообщений, который получает список сообщений RabbitMQ, для каждого из них вызывает отправителя сообщения в отдельном виртуальном потоке и возвращает список будущих результатов отправки электронных писем.
@Service
@RequiredArgsConstructor
public class MessageProcessor {

    private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
    private final MessageSender messageSender;
    private final Jackson2JsonMessageConverter converter;

    public List<MessageProcessingFuture> processMessages(List<Message> messages) {
        List<MessageProcessingFuture> processingFutures = new ArrayList<>();

        for (Message message : messages) {
            MessageProcessingFuture messageProcessingFuture = new MessageProcessingFuture(
                    message.getMessageProperties().getDeliveryTag(),
                    processMessageAsync(message, executor)
            );
            processingFutures.add(messageProcessingFuture);
        }

        return processingFutures;
    }

    private Future<?> processMessageAsync(final Message message) {
        RabbitEmailDto rabbitEmailDto = (RabbitEmailDto) converter.fromMessage(message);
        MessageDeliverySystemEmailDto email = rabbitEmailDto.getData().getEmail();

        return executor.submit(() -> messageSender.sendMessage(rabbitEmailDto));
    }
}
  1. Прослушиватель сообщений RabbitMQ, который принимает сообщения из очереди Rabbit, передает их процессору, а затем обрабатывает фьючерсы, полученные от процессора, отправляя подтверждение или отклонение в RabbitMQ в зависимости от того, вызвал ли Future.get() исключение или нет. .
@Component
@RequiredArgsConstructor
public class BatchMessageListener implements ChannelAwareBatchMessageListener {

    private final MessageProcessor messageProcessor;

    @Override
    @MeasureExecutionTime
    public void onMessageBatch(final List<Message> messages, final Channel channel) {
        messageProcessor.processMessages(messages)
                .forEach(processingFuture -> processFuture(processingFuture, channel));
    }

    private void processFuture(final MessageProcessingFuture future, final Channel channel) {
        try {
            future.deliveryFuture().get();
            channel.basicAck(future.deliveryTag(), false);
        } catch (Exception e) {
            channel.basicReject(future.deliveryTag(), false);
        }
    }
}

В журналах я вижу, что метод MessageSender.sendMessage действительно выполняется в виртуальном потоке, обозначенном как VirtualThread[#100]/runnable@ForkJoinPool-1-worker-1.

И я вижу, что у меня есть 4 таких рабочих на нашем рабочем сервере. (Правильно ли я, что эти рабочие потоки являются реальными потоками платформы или потоками-носителями?)

Я также вижу, что выполнение метода MessageSender.sendMessage обычно занимает около 1 секунды, причем большая часть этого времени тратится на ожидание ответа от SMTP-сервера.

Основываясь на том, что я узнал о виртуальных потоках, я ожидал, что обработка пакета из 100 сообщений (это мой настроенный размер пакета для BatchMessageListener) займет около 1 секунды, поскольку потоки платформы не будут блокироваться при вызовах SMTP-сервера. . И эти 4 потока платформы будут совместно использоваться 100 виртуальными потоками, что фактически позволит осуществлять 100 практически одновременных вызовов SMTP-сервера.

Однако на практике я заметил, что сообщения обрабатываются по 4 за раз, а обработка всех 100 сообщений занимает около 25 секунд.

Во время локального тестирования на моем компьютере я намеренно ввел задержку в 1 секунду, добавив Thread.sleep(1000); перед строкой sender.send(message); в MessageSender, чтобы имитировать задержку сети. И тогда пакет из 100 сообщений действительно был обработан буквально за 1 секунду, несмотря на то, что по логам у меня было всего 10 потоков несущей.

Я озадачен. Почему потоки оператора связи не блокируются при вызове Thead.sleep, а блокируются при вызове внешнего сервиса? Я делаю что-то не так?

Похоже на закрепление, когда виртуальный поток не отключается от несущего потока. См. статью Закрепление виртуального потока Java Тодда Гинзберга, где вы найдете руководство о том, как обнаружить закрепление. Он описывает, как обнаружить закрепление с помощью (1) ведения журнала и (2) использования Java Flight Recorder (JFR). Закрепление обычно происходит из-за (а) использования synchronized вокруг долго выполняющегося кода или (б) вызова собственного кода через JNI и т. д.

Basil Bourque 19.04.2024 16:55

В конечном итоге вы вызываете Thread.sleep() 100 раз одновременно в одном и том же потоке. Заставляя его эффективно спать намного короче, чем 100 секунд. Это всего лишь предположение.

FuryFart 19.04.2024 17:05

Задержка вашей сети составляет 1 секунду? Тогда вам действительно следует сосредоточиться на исправлении вашей сети.

aled 19.04.2024 19:27

@BasilBourque много танкует. Я вижу, что org.springframework.mail.javamail.JavaMailSender, который я использую, вызывает несколько синхронизированных методов при отправке электронного письма. Правильно ли я понимаю, что в этом случае мало что можно сделать, чтобы извлечь выгоду из виртуальных гусениц?

Alex Kolokolov 19.04.2024 21:16

@AlexKolokolov, можете ли вы рассказать, какие синхронизированные методы вызываются внутри org.springframework.mail.javamail.JavaMailSender? и обновите вопрос, указав соответствующую ему трассировку стека закрепленных тем (-Djdk.tracePinnedThreads).

Naman 20.04.2024 09:33

@Naman метод JavaMailSender.send вызывает синхронизированные методы protocolConnect, getAuthorizationId и sendMessage класса org.eclipse.angus.mail.smtp.SMTPTransport под капотом. Извините, я не могу поделиться фактическими трассировками стека, поскольку работаю над проприетарным программным обеспечением, на которое распространяется соглашение о неразглашении.

Alex Kolokolov 20.04.2024 12:40

Да, я видел несколько методов реализации, которые должны быть синхронизированы с помощью Javadoc, просто хотел подтвердить те из них, которые вы могли наблюдать в трассировке.

Naman 20.04.2024 14:26

Вы написали "Почему потоки-носители не блокируют вызов Thead.sleep, а блокируют вызов внешнего сервиса?" Вы имели в виду булавку на Thread.sleep? Если какой-либо поток, виртуальный или платформенный, не блокируется Thread.sleep, другими словами, ему разрешено работать без приостановки в течение определенного времени, то это гигантская ошибка в реализации JVM. Закрепление и блокировка — совершенно разные явления.

igor.zh 20.04.2024 16:25

Чтобы обнаружить закрепление, вам следует запускать JFR в исполняемом коде, а не пристально следить за исходным кодом. Программисты всех уровней квалификации, как известно, плохо умеют предсказывать проблемы с производительностью во время выполнения.

Basil Bourque 20.04.2024 16:48

@igor.zh Thread.sleep заблокировал виртуальные темы. В моем случае все 100 виртуальных потоков были заблокированы на 1 секунду и освобождены одновременно. И такого поведения я ожидал от виртуальных потоков. Я не заметил никакого закрепления следующего вызова sender.send, поскольку локально он длился всего несколько миллисекунд. Я не использовал термин «закрепление» в своем вопросе, потому что на момент публикации я ничего не знал о закреплении.

Alex Kolokolov 20.04.2024 17:29

@BasilBourque Я обнаружил закрепление с помощью tracePinnedThreads. Я заглянул в исходный код библиотеки, чтобы убедиться, что большая часть логики метода JavaMailSender.send выполняется внутри метода synchronised.

Alex Kolokolov 20.04.2024 17:41

Хорошо, приятно слышать, что Thread.sleep по-прежнему работает так, как ожидалось :) Если это действительно закрепление, не могли бы вы отредактировать свой вопрос и упомянуть закрепление вместо блокировки, чтобы можно было дать более точный ответ? Спасибо за ответ!

igor.zh 20.04.2024 17:45

Закрепление @igor.zh - это ключевой ответ на мой вопрос, его не может быть в самом вопросе. Если бы я знал о закреплении раньше, не было бы и вопросов. Поэтому я не буду редактировать вопрос, чтобы позволить другим, кто еще не знает о закреплении, встретить вопрос и узнать о закреплении из ответа.

Alex Kolokolov 20.04.2024 17:56

@igor.zh вообще-то формулировка вопроса правильная. Термин «блокировка» используется в отношении потока-носителя, что происходит потому, что виртуальный поток закреплен. С точки зрения потока-носителя здесь не происходит ничего особенного. Он заблокирован так же, как могут быть заблокированы другие потоки платформы.

Holger 24.04.2024 09:37

@Хольгер, да, если закрепление может произойти только тогда, когда поток-носитель блокируется, как предлагает Oracle. Однако обратите внимание, что изначально я столкнулся с утверждением «потоки-носители не блокируются в Thread.sleep call», что практически невозможно, но имело бы смысл, если бы блокировку заменили закреплением.

igor.zh 24.04.2024 18:17

@igor.zh все наоборот: поток-носитель может быть заблокирован, когда виртуальный поток закреплен (и выполняет блокирующее действие, пока закреплен). Закрепление означает только то, что виртуальный поток не может отключиться от своего несущего потока. Этот термин не является заменой блокировки. Таким образом, исходное утверждение действительно, если вы помните, что виртуальный поток выполняет вызов sleep: «поток-носитель (обычно) не блокирует вызов sleep виртуального потока», но учтите, что виртуальный поток может быть уже закреплен , например при вызове sleep внутри блока synchronized

Holger 25.04.2024 11:58

@Holger, как может такое случиться, что поток-носитель (обычно) не блокирует (!) вызов сна виртуального потока». Thread.sleep должен заблокировать любую тему, разве это не единственная его цель? В его javadoc говорится: «Приводит... поток... временно прекращает выполнение». Я неправильно прочитал javadoc или у нас разное понимание блокировки потоков? Блокировка для меня это LockSupport.park, вход в раздел synchronized на конкурирующем мониторе, join и конечно sleep. Ожидание вращения и CAS, например. в AQS неблокируются. Что мне здесь не хватает?

igor.zh 26.04.2024 04:09

@igor.zh, когда виртуальный поток выполняет вызов sleep, виртуальный поток блокируется. Поток-носитель не блокируется, но продолжит выполнение другого виртуального потока. В этом вся суть виртуальных потоков. Так обычно и происходит. Но когда виртуальный поток закрепляется при выполнении вызова sleep, поток несущей также блокируется. Вот что значит pinned, невозможность отмонтировать, поэтому несущий поток не может сделать что-то еще.

Holger 26.04.2024 09:14

@Хольгер, да, это имеет смысл. Оператор связи, когда он спотыкается об операции блокировки, просто отправляется в другой виртуальный поток или возвращается в ForkJoinPool — в идеале. В этом смысле и виртуальная, и платформа блокируются на sleep, а оператор связи (опять же платформа :) ) — нет. Вопрос: когда вы сказали, что «поток-носитель (обычно) не блокируется в виртуальном потоке», вы обычно имели в виду, что когда он блокируется, это является причиной (или веской причиной) для закрепления виртуального потока, установлен на этом носителе?

igor.zh 27.04.2024 15:03

Причина @igor.zh, по которой поток-носитель блокируется, когда виртуальный поток выполняет вызов sleep, может заключаться в том, что виртуальный поток закреплен или вы используете пользовательскую реализацию jre, где поддержка виртуального потока для sleep не реализована ( еще).

Holger 30.04.2024 11:13
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
3
20
369
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Закрепление

Похоже на закрепление, когда виртуальный поток не отключается от несущего потока.

Закрепление обычно происходит из-за:

  • Использование synchronized в долго выполняющемся коде.
  • Вызов машинного кода через Java Native Interface (JNI) и т. д.

Если у вас есть четыре потока-носителя (потоки платформы, предназначенные для обслуживания виртуальных потоков), и эти виртуальные потоки закреплены большую часть времени, то вы фактически ограничили пропускную способность работы этими четырьмя потоками-носителями. Оставшиеся 96 задач из 100 должны дождаться завершения первых четырех, затем 92 — завершения следующих четырех и так далее. В таком случае вам следует использовать потоки платформы, а не виртуальные потоки. Виртуальные потоки не приносят никакой пользы и фактически создают дополнительную работу.

Обнаружение закрепления

См. статью Закрепление виртуального потока Java Тодда Гинзберга, где вы найдете руководство о том, как обнаружить закрепление. Он описывает, как обнаружить закрепление:

Вы можете просмотреть результаты JFR с помощью JDK Mission Control (JMC).

Обратите внимание, что вы можете настроить порог обнаружения закрепления. Я смутно припоминаю, что значение по умолчанию составляет 20 миллисекунд. Но вам следует проверить и определить свою собственную полезную ценность.

Обходные пути/альтернативы

Вы позже прокомментировали:

Я вижу, что org.springframework.mail.javamail.JavaMailSender, который я использую, вызывает несколько синхронизированных методов при отправке электронного письма. Правильно ли я понимаю, что в этом случае мало что можно сделать, чтобы извлечь выгоду из виртуальных гусениц?

Правильно… если этот код synchronized работает долго.

Виртуальные потоки не подходят для задач, требующих длительного выполнения кода, который является либо synchronized, либо собственным (JNI и т. д.). В обоих случаях планировщик виртуальных потоков JVM не может определить, когда такой код заблокирован, поэтому он остается назначенным потоку-носителю платформы.

Для случайных встреч эта ситуация не имеет большого значения. Но для повторяющихся или продолжительных столкновений эта ситуация означает, что вы не получите никакой пользы от виртуальных потоков, а виртуальные потоки могут фактически ухудшить общую производительность. Похоже, ваш случай соответствует категории «повторяющиеся или продолжительные встречи».

Команда Project Loom продолжает искать способы обойти synchronized ограничение. Но все еще проблема с Java 21 и 22.

Замена synchronized на ReentrantLock

Если код synchronized принадлежит вам, замените любое длительное использование synchronized на ReentrantLock, чтобы восстановить эффективность с помощью виртуальных потоков. Это изменение достаточно простое.

Некоторые люди неверно истолковали это руководство как «замените все использование synchronized на ReentrantLock». В этом нет необходимости. Модифицировать нужно только долго работающий код synchronized. Краткий код, например, для защиты доступа к обычно доступной переменной, можно оставить на месте, поскольку на код synchronized тратится мало времени.

Если вы не можете изменить исходный код долго выполняющихся synchronized задач, используйте потоки платформы, а не виртуальные потоки. Но имейте в виду, что если эти задачи имеют несколько подзадач, которые могут быть многопоточными, задача в потоке платформы может выиграть от выполнения этих подзадач с виртуальными потоками.

Для получения дополнительной информации

Я не эксперт в этом. Так что слушайте экспертов, а не меня.

  • Изучите документ с основными требованиями, описывающий эту функцию, JEP 444: Виртуальные потоки
  • Посмотрите видеопрезентации членов команды Project Loom Алана Бейтмана и Рона Пресслера.
  • См. разъяснительные беседы Хосе Помара.
  • Время от времени заглядывайте на страницу проекта Project Loom.

Другие вопросы по теме