Почему шина событий Vertx блокируется при высокой нагрузке?

Я пытаюсь отправить много сообщений по шине событий Vertx, как это (кластеризовано с помощью Hazelcast) без блокировки:

EventBus eb = vertx.eventBus();

for (int i = 0; i < 100; i++) {
  vertx.setPeriodic(1, num -> {
    eb.send("clusteredEndpoint", "ping");
  });
}

Когда количество таймеров меньше, все работает нормально, но около 100 таймеров я получаю эту ошибку.

Мне интересно, как масштабироваться до 100 тыс. событий/с без блокировки (для справки я написал тест Vertx WebSocket, который может превысить это число).

Если это невозможно, я хотел бы понять, что блокирует - похоже, это что-то в этом классе: https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io /vertx/core/eventbus/impl/clustered/Serializer.java

Для справки - этот код не блокируется - даже с 1000 таймерами:

HttpClient client = vertx.createHttpClient();
client.webSocket(8080, "localhost", "/", res -> {
  for (int i = 0; i < 1000; i++) {
    vertx.setPeriodic(1, num -> {
      res.result().writeTextMessage("ping");
    });
  }
});
});

15 декабря 2020 г., 10:54:38 ВНИМАНИЕ: Thread Thread[vert.x-eventloop-thread-1,5,main] был заблокирован на 36794 мс, ограничение по времени 2000 мс io.vertx.core.VertxException: поток заблокирован в io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140) в io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) в io.vertx.core.impl.future.FutureImpl.onComplete(FutureImpl.java:133) в io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) в io.vertx.core.spi.cluster.impl.selector.Selectors.withSelector(Selectors.java:48) в io.vertx.core.spi.cluster.impl.DefaultNodeSelector.selectForSend(DefaultNodeSelector.java:42) в io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$$Lambda$1065/195695453.accept(Неизвестно Источник) в io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue$SerializedTask.process(Serializer.java:147) в io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.checkPending(Serializer.java:94) в io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.add(Serializer.java:114) в io.vertx.core.eventbus.impl.clustered.Serializer.queue(Serializer.java:65) в io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendOrPub(ClusteredEventBus.java:172) в io.vertx.core.eventbus.impl.OutboundDeliveryContext.next(OutboundDeliveryContext.java:127) в io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:394) в io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:400) в io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:103) в io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:97) в io.vertx.example.EBtestClient.lambda$start$0(EBtestClient.java:22) в io.vertx.example.EBtestClient$$Lambda$1056/1487417027.handle(Неизвестно Источник) в io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:939) в io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:910) в io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:52) в io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:294) в io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) в io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:49) в io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:24) в io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:933) в io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98) в io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176) в io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) в io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) на io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) на io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) в io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) в io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) на java.lang.Thread.run(Thread.java:748)

Примечание: ваш код WebSockets не эквивалентен вашему коду EventBus. Эквивалентно было бы, если бы вы открыли веб-сокет внутри своего обратного вызова, даже с упоминанием кэширования @tsegismont.

Alexey Soshin 16.12.2020 22:46
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
1
1 233
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Во-первых, вы будете выполнять 100 задач в одном потоке, потому что Vert.x имеет сходство с потоками. Если вы хотите этого избежать, запускайте их на отдельных вершинах. Но все же, я не думаю, что у вас 100 процессоров, так что споров будет много.

И настройка их всех на выполнение каждую 1 мс означает, что они каким-то образом должны заканчиваться за 10 микросекунд каждый, включая сетевой код, потому что вы используете кластеризованную шину событий.

Итак, это то, как написан тест, а не то, что делает Vert.x.

Если вы действительно хотите протестировать такую ​​нагрузку (здесь мы говорим о 100 000 об/с), распределите свои запросы по нескольким машинам.

Но тогда я не уверен, что Hazelcast рассчитан на такую ​​нагрузку.

Если вы хотите знать, что на самом деле блокирует, я предполагаю, что это часть кода:

https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java#L43

Поскольку у меня нет готовой кластеризованной Vert.x, я не могу подтвердить, правильно ли мое предположение.

Я не уверен, при чем здесь Hazelcast — знаете ли вы, какой код конкретно блокирует — я не думаю, что это Hazelcast? Я думал, что «отправить» должен быть асинхронным?

Charlie 16.12.2020 14:42

Алексей прав: попробуйте распределить свои запросы по ядрам, используя несколько инжекторных вертикул, а если этого недостаточно, то по нескольким машинам.

tsegismont 16.12.2020 14:50

@Alexey: когда адрес всегда один и тот же, менеджер кластера больше не участвует: Vert.x кэширует адресатов и использует свои собственные соединения для передачи сообщений.

tsegismont 16.12.2020 14:51

Я знаю, что могу увеличить производительность за счет масштабирования - вопрос в том, как использовать шину событий без блокировки и почему она блокируется только при высокой нагрузке. Похоже, что невозможно использовать шину событий без блокировки, что отличается от, скажем, WebSockets, которые даже с 1000 таймерами все еще не блокируются.

Charlie 16.12.2020 15:28

@Charlie Я отредактировал свой ответ с моим подозреваемым, а также примечание о сравнении между EventBus и WebSockets. Надеюсь, это поможет.

Alexey Soshin 16.12.2020 23:09
Ответ принят как подходящий

Вот мой анализ после дальнейшего расследования:

При использовании шины событий Vertx для удаленной связи, когда потребитель оказывается перегруженным, он перестает отвечать на запросы. Это приводит к блокировке производителя, и я зафиксировал 3 разных блокирующих сообщения (см. ниже). После предупреждения о блокировке появляется следующее предупреждение:

ВНИМАНИЕ: Нет pong с сервера 2d1fb2ce-940f-4b60-bf60-39847f31bcaf - будет считать его мертвым

Ответ на мой вопрос заключается в том, что не имеет значения, «почему» он блокируется, потому что он мертв (потому что он достиг определенного предела).

Я удивлен, что Vert.x не справляется с этим более изящно - например, может генерировать исключение.

Ошибка блокировки №1

Поток заблокирован на io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140) на io.vertx.core.impl.future.PromiseImpl.addListener(PromiseImpl.java:23) на io.vertx.core .impl.future.FutureImpl.onComplete(FutureImpl.java:133) в io.vertx.core.impl.future.PromiseImpl.onComplete(PromiseImpl.java:23) в io.vertx.core.spi.cluster.impl.selector .Selectors.withSelector(Selectors.java:48) в

Ошибка блокировки №2

io.vertx.core.VertxException: поток заблокирован в java.nio.charset.CharsetEncoder.(CharsetEncoder.java:198) в java.nio.charset.CharsetEncoder.(CharsetEncoder.java:233) в sun.nio.cs.UTF_8$Encoder.(UTF_8.java:558) в sun.nio.cs.UTF_8$Encoder.(UTF_8.java:554) на sun.nio.cs.UTF_8.newEncoder(UTF_8.java:72)

Ошибка блокировки №3

io.vertx.core.VertxException: поток заблокирован в io.vertx.core.eventbus.impl.clustered.ConnectionHolder.writeMessage(ConnectionHolder.java:93) на io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendRemote(ClusteredEventBus.java:332) на io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendToNode(ClusteredEventBus.java:283)

Другие вопросы по теме