Управление соединениями JDBC с использованием виртуальных потоков Java

У нас есть служба на Java 17, которая выполняет логику на нашей стороне и отправляет запрос в стороннюю систему. Время ответа сторонней системы составляет около 800–1400 мс. Для этого у нас есть ThreadPoolExecutor размером 12 (нельзя увеличивать его дальше из-за ограничений инфраструктуры)

Наша скорость отправки запросов составляла 5–8 запросов в секунду из-за времени ответа сторонней системы. Чтобы увеличить пропускную способность, мы обновляем Java 21, чтобы использовать виртуальные потоки. Однако во время тестирования разработки, похоже, возникла еще одна проблема.

Наша скорость отправки запросов увеличилась примерно до 50 запросов в секунду. Но мы сталкиваемся с исключениями, связанными с сбоем запроса на соединение JDBC. Размер моего пула подключений к базе данных должен быть около 100.

Когда моя служба запускается, к моему микросервису принадлежит 10 подключений. При отправке запросов с помощью ThreadPoolExecutor: 13 дополнительных подключений, всего 23. При использовании newVirtualThreadPerTaskExecutor: всего 94 соединения.

Я использую следующий запрос для мониторинга моей базы данных PostGre V13.9.

SELECT pid, datname, usename, application_name, client_addr, client_port, backend_start, query_start, state_change, query, state
FROM pg_stat_activity where application_name ='PostgreSQL JDBC Driver';

Исключения:

org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:466) ~[spring-orm-6.1.2.jar:6.1.2]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:531) ~[spring-tx-6.1.2.jar:6.1.2]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:405) ~[spring-tx-6.1.2.jar:6.1.2]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:610) ~[spring-tx-6.1.2.jar:6.1.2]
    at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314) ~[na:na]
    at java.base/java.lang.VirtualThread.run(VirtualThread.java:309) ~[na:na]
Caused by: org.hibernate.exception.GenericJDBCException: Unable to acquire JDBC Connection [FATAL: remaining connection slots are reserved for non-replication superuser connections] [n/a]
at java.base/java.util.concurrent.ThreadPerTaskExecutor$TaskRunner.run(ThreadPerTaskExecutor.java:314) ~[na:na]
    at java.base/java.lang.VirtualThread.run(VirtualThread.java:309) ~[na:na]
Caused by: org.hibernate.exception.GenericJDBCException: Unable to acquire JDBC Connection [FATAL: remaining connection slots are reserved for non-replication superuser connections] [n/a]
    at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:63) ~[hibernate-core-6.4.1.Final.jar:6.4.1.Final]

Я понимаю, что при использовании ThreadPoolExecutor из 12 потоков каждый поток устанавливает соединение и использует его. При использовании виртуальных потоков, как только какой-либо виртуальный поток занят ожиданием ответа или Thread.sleep, другой виртуальный поток берет на себя инициативу. Как только я запускаю нагрузочный тест, я замечаю всплеск подключений, некоторые из которых приводят к исключениям (например, 11 запросов не удалось из 1500 (из-за ошибки JDBC)), а затем процесс продвигается плавно, используя максимум 94 соединения. на данный момент это у меня есть.

Вопрос в том, как мне решить эту проблему и ограничить количество одновременно запускаемых процессов. И что еще более важно, ограничьте количество подключений/подключений повторного использования, создаваемых к базе данных. У меня есть несколько других приложений в базе данных, поэтому я не смогу их значительно увеличить.

Я попытался использовать свойство системы "jdk.virtualThreadScheduler.maxPoolSize", передав аргумент VM -Djdk.virtualThreadScheduler.maxPoolSize=5

Но исключение все равно возникает, даже если установлено -Djdk.virtualThreadScheduler.maxPoolSize=1.

Что мне не хватает? Означает ли это, что виртуальные потоки следует использовать для основных задач ввода-вывода, задач ЦП, а не для связи со сторонними системами?

Некоторый код того, как инициируются мои виртуальные потоки:

this.executorService = Executors.newSingleThreadScheduledExecutor();
this.taskExecutorService = Executors.newVirtualThreadPerTaskExecutor();

this.executorService.scheduleAtFixedRate(this, 0L, this.pollingTime.toMillis(), TimeUnit.MILLISECONDS); //Called once when service starts

public void run() {
    this.taskExecutorService.execute(//my runnable job);
}

Я также пытался использовать счетчики/семафоры для ограничения виртуальных потоков. Это снижает пропускную способность, но по-прежнему создает больше новых соединений по сравнению с ThreadPoolExecutor. Это спроектировано таким образом? Я подозреваю, что каждый виртуальный поток/процесс создает новое соединение, тогда как в ThreadPoolExecutor он использует соединение, назначенное его PID. Любые разъяснения или предложения, пожалуйста?

Обновлено: (Код подключения JDBC) Я вижу большинство запросов на удаление в открытых соединениях, поскольку именно это выполняется при отправке запросов через виртуальные потоки. Ниже приведен код.

public Optional<Job> getNextJob(String queue) {
    Optional<Job> job = Optional.empty();

    try {
        Connection connection = this.dataSource.getConnection();

        try {
            String sql = "DELETE FROM scheduler_task WHERE id = (SELECT id FROM scheduler_task WHERE queue = ? and trigger_date < now() LIMIT 1 FOR UPDATE SKIP LOCKED) RETURNING id, queue, reference_id, trigger_date";
            PreparedStatement stmt = connection.prepareStatement(sql);

            try {
                ResultSet resultSet = stmt.executeQuery();

                try {
                    if (resultSet.next()) {
                        job = Optional.of(this.mapToJob(resultSet));
                    }
                } catch (Throwable var14) {
                    if (resultSet != null) {
                        try {
                            resultSet.close();
                        } catch (Throwable var13) {
                            var14.addSuppressed(var13);
                        }
                    }
                    throw var14;
                }

                if (resultSet != null) {
                    resultSet.close();
                }
            } catch (Throwable var15) {
                if (stmt != null) {
                    try {
                        stmt.close();
                    } catch (Throwable var12) {
                        var15.addSuppressed(var12);
                    }
                }

                throw var15;
            }
            if (stmt != null) {
                stmt.close();
            }
        } catch (Throwable var16) {
            if (connection != null) {
                try {
                    connection.close();
                } catch (Throwable var11) {
                    var16.addSuppressed(var11);
                }
            }

            throw var16;
        }
        if (connection != null) {
            connection.close();
        }

        return job;
    } catch (SQLException var17) {
        log.error("error", var17);
        throw new Exception(var17);
    }
}

Не уверен, что вы спрашиваете. Хотите ограничить количество задач, выполняющих работу с базой данных? Если да, используйте Семафор с рядом разрешений, настроенных на желаемое максимальное количество одновременных задач базы данных.

Basil Bourque 16.05.2024 01:08

Вы не получите никаких полезных ответов, пока не продемонстрируете некоторый код, который создает соединения JDBC и освобождает их.

user207421 16.05.2024 03:02

@BasilBourque Я использовал семафоры для управления количеством виртуальных потоков одновременно. Например, 20 или 50. Но все равно создается больше соединений, чем разрешенных виртуальных потоков, например 70,80,90. Этого не происходит с обычным пулом потоков. Это когда соединение открывается виртуальным потоком, оно закрывается после выполнения всех операций ввода-вывода, в то время как другие входящие виртуальные потоки открывают новые соединения...

Shoaib 16.05.2024 08:23

@user207421 user207421 добавил код JDBC.

Shoaib 16.05.2024 08:36

Кстати, используйте синтаксис try-with-resources для автоматического закрытия ресурсов JDBC: Connection, ReadedStatement, ResultSet.

Basil Bourque 16.05.2024 08:58

См. документацию . И смотрите мой Ответ.

Basil Bourque 16.05.2024 09:05

@BasilBourque Честно говоря, код в вопросе выглядит как декомпилированный байт-код, где в исходном источнике использовалась попытка с ресурсами.

Mark Rotteveel 16.05.2024 09:29

Это верно, исходный код имеет синтаксис try-with-resources. Я не разместил это, поскольку могли бы возникнуть споры о том, закрывает ли это соединение, потому что оно явно не показывает закрытие соединения. Я предполагаю, что проблема в том, что виртуальные потоки необходимо ограничить. Если в любом случае количество моих виртуальных потоков увеличится до 200 за раз, и у меня будет 100 подключений к БД, то они столкнутся с исключениями, верно? Какой наиболее эффективный способ ограничить виртуальные потоки?

Shoaib 16.05.2024 12:53

Вы используете Хикари для доступа к базе данных? По умолчанию он поставляется с Spring Boot Data. Если да, то с этим проблема github.com/brettwooldridge/HikariCP/issues/2151

Jose Antonio 16.05.2024 16:36

Помимо технических проблем с растущим количеством соединений, я не совсем понимаю, чего вы хотели добиться переключением на ограниченное количество виртуальных потоков в вашем случае ответа в районе 800-1400мс. Переключение с потоков платформы на ограниченное количество виртуальных потоков с использованием семафоров и тому подобного может принести некоторую пользу, но это вряд ли применимо к вашему случаю таких медленных ответов. Какую пользу вы могли бы получить от такого переключения? Сэкономить несколько циклов ЦП на более быстром переключении контекста для виртуальных потоков, чем для платформы?

igor.zh 16.05.2024 17:02

Я стремлюсь к тому, чтобы ответ задерживался, а не 10 потоков платформ, работающих в пуле каждые 0,8-1,4 секунды, использовать виртуальные потоки для инициации новых запросов и не имеет значения, когда приходит ответ. Итак, если у меня есть 10 000 запросов, они могут быть выполнены (без ответа) за меньшее время, например, 2 минуты, вместо того, чтобы отправлять + получать ответ и выполнять все запросы, например, за 20 минут. Так что, думаю, я просто хочу максимизировать свою утилиту подключений к базе данных. . Например, я не могу позволить себе заблокировать более 12 потоков, но я могу позволить себе использовать все 90 моих подключений к базе данных.

Shoaib 17.05.2024 11:56
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
11
870
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Вы используете Хикари для доступа к базе данных? По умолчанию он поставляется с Spring Boot Data. Если да, то с этим проблема

https://github.com/brettwooldridge/HikariCP/issues/2151

По сути, после создания соединение остается в хранилище ThreadLocal. При использовании пула потоков при первом обращении потока к базе данных он создает соединение и сохраняет его в пространстве ThreadLocal своего потока. Если этот поток используется повторно, как в случае с пулом потоков, соединение уже существует и используется повторно. Однако при работе с виртуальными потоками потоки создаются и уничтожаются непрерывно, поэтому их пространство ThreadLocal очищается, поэтому соединения создаются и уничтожаются один раз для каждой задачи. В вашем случае, если у вас есть 1000 потоков, пытающихся получить доступ к базе данных, они могут попытаться создать 1000 подключений.

Спасибо. Кажется, это та же самая проблема. Но как я могу быть уверен, что мой проект использует Hikari? Я использую JDK 21, Spring Boot 3.2.1. com.querydsl querydsl-apt 5.0.0 <classifier>jakarta</classifier> , javax.sql.DataSource. Насколько я знаю, логи показывают спящий режим

Shoaib 17.05.2024 11:50

Разве реализации виртуальных потоков не должны быть такими: они выполняются, но соединения создаются потоками платформы. Поэтому, когда виртуальный поток назначается потоку платформы, он будет повторно использовать соединение потока платформы. Что-то вроде того. Вместо того, чтобы виртуальные потоки создавали новые соединения.

Shoaib 17.05.2024 12:07

Возможно, дело в том, что это происходит именно так (т. е. потоки платформы создают новые соединения, а не виртуальные потоки). Но если, например, общее количество потоков платформы составляет 256, то мы не можем быть уверены, что если мы установим ограничение потоков платформы на 90, тогда вашим виртуальным потокам будет назначено только 90 одинаковых потоков платформы. Вместо этого будет использоваться любой из 90 из 256 потоков платформы, что приведет к созданию уникальных соединений. Просто некоторые мысли.

Shoaib 17.05.2024 12:10

Мы используем Tomcat JDBC.

Shoaib 20.05.2024 22:21

@Shoaib Просто взглянув на логи, вы должны увидеть некоторые из них из пакетов хикари. Кроме того, если он находится в пути к классам (смотря на дерево зависимостей maven), скорее всего, он используется, поскольку он настроен по умолчанию в Spring Boot.

Jose Antonio 21.05.2024 12:56

Кроме того, это не работает так. Виртуальные потоки используют несколько реальных потоков ОС (обычно столько же, сколько и ядер), но концепция ThreadLocal связана с потоками Java, а не с потоками ОС. Каждый поток Java имеет собственное пространство ThreadLocal. В традиционных потоках, поскольку поток Java является потоком ОС и потоки используются повторно, пространство ThreadLocal сохраняется от одной задачи к другой. В виртуальных потоках каждая задача выполняется в новом потоке Java, который имеет собственное новое пространство ThreadLocal, и они монтируются поверх потоков ОС, но, поскольку ThreadLocal является чем-то связанным с Java, он теряется.

Jose Antonio 21.05.2024 12:59

На самом деле я пришел к выводу, что это именно то, в чем моя проблема. У меня 100 связей. Когда я использую newVirtualThreadPerTaskExecutor, количество виртуальных потоков не ограничено, и они инициируют задачи, в результате которых создаются соединения. Когда они превышают, я получаю ошибки пула. Если я использую семафоры и разрешаю только 50 или 80 одновременно, исключений не будет. Так будет ли это решено в упомянутой проблеме хикари путем возобновления соединений в случае виртуальных потоков? Или это естественная проблема и мы обязаны использовать семафоры?

Shoaib 21.05.2024 13:13

Использование семафора не позволит вам создавать неограниченное количество соединений, поэтому ваше приложение не выйдет из строя, но будет создавать соединение для каждой задачи, поэтому производительность будет хуже, чем при использовании традиционного пула потоков. В традиционном варианте для каждого потока будет создано одно соединение, и в какой-то момент каждая задача будет повторно использовать эти соединения. При использовании виртуальных потоков каждая задача будет создавать собственное соединение, что отнимает время и снижает производительность.

Jose Antonio 22.05.2024 11:32
Ответ принят как подходящий

Я считаю, что проблема связана с инициированием бесчисленных виртуальных потоков, каждый из которых хочет создавать новые соединения. Было бы лучше, если бы у меня было 90 доступных подключений к базе данных, я ограничил бы используемые потоки платформы до 90 с помощью jdk.virtualThreadScheduler.maxPoolSize. Этот пул размером в 90 потоков платформы создает 90 соединений. Затем инициируйте множество виртуальных потоков, каждый из которых не создает новые соединения с БД, а повторно использует соединения этих 90 потоков платформы.

Как предложил Бэзил Бурк, я ограничил отправку запросов через виртуальные потоки с использованием семафора.

private void executeJob(Job job) {
     semaphore.acquire();
     jobExecutor.execute(job);
     semaphore.release();
}

и чтобы избежать блокировки вызовов, ожидающих получения, выполняйте вышеуказанный метод только в том случае, если:

if (semaphore.availablePermits() > 0) {
    Job = //find job
    executeJob(job);
}

Кажется, это работает и ограничивает мои подключения к базе данных. Кроме того, несмотря на медленные ответы, пропускная способность увеличилась с 8 запросов/с до 70 запросов/с.

Редактировать: Как упоминал @igor.zh, и во время тестирования доступные разрешения являются лишь приблизительными, я изменил свой метод следующим образом. Также я использовал пул tomcat-jdbc и переключился на Hikari, который кажется более эффективным с соединениями.

private void executeJob(Job job) {
    if (!semaphore.tryAcquire()) {
        //let the thread die
        return;
    }
    try {
        jobExecutor.execute(job);
    } finally {
        semaphore.release();
    }
}

Не используйте Semaphore.availablePermits для синхронизации! В Javadoc этого метода говорится: Этот метод обычно используется для целей отладки и тестирования. Ваш последний фрагмент не является атомарным: например. сразу после того, как возвращается ненулевое количество разрешений, могут подключиться один или несколько других потоков, и счетчик может быть равен 0. Ваш первый фрагмент вроде бы в порядке (за исключением того, что все же лучше сделать release в блоке finally оператора try-finally ). Я думаю, блокировка - это то, что именно вам нужно (особенно когда вы используете виртуальные потоки - она ​​просто освободит Carrier).

igor.zh 17.05.2024 21:08

Другие вопросы по теме