Как исправить «MissingBackpressureException»

Я использую RxJava2 Flowables, подписавшись на поток событий из PublishSubject. Он используется в приложении корпоративного уровня, и у нас нет возможности отбрасывать какие-либо события. Я использую версию RxJava 2.2.8.

Я использую BackpressureStrategy.BUFFER, так как не хочу терять ни одно из своих событий.

Кроме того, я снова буферизуюсь на 50000 или 3 минуты, в зависимости от того, что наступит раньше. Это я делаю, так как хочу консолидировать события и потом их обрабатывать.

Но я получаю следующие ошибки через несколько минут моего запуска

io.reactivex.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
at io.reactivex.internal.subscribers.QueueDrainSubscriber.fastPathOrderedEmitMax(QueueDrainSubscriber.java:121)
at io.reactivex.internal.operators.flowable.FlowableBufferTimed$BufferExactBoundedSubscriber.run(FlowableBufferTimed.java:569)
at io.reactivex.Scheduler$Worker$PeriodicTask.run(Scheduler.java:479)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)

Я попытался увеличить размер буфера путем настройки, но поведение не изменилось.

System.setProperty("rx2.buffer-size", "524288");

Кроме того, если я буферизуюсь в течение более длительного времени вместо 3 минут, я получаю исключение после гораздо более длительного времени, вероятно, потому, что мой нисходящий поток работает лучше, когда события консолидируются больше. Однако у меня нет такого выбора, потому что это живые события и их нужно обрабатывать немедленно (через 3-5 минут).

Я также пробовал thread.sleep() перед вызовом «subscription.next» в случае ошибки, но все равно получал те же результаты.

keySubject.hide()
.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() {

@Override
  public void onSubscribe(Subscription var1) {
   innerSubscription = var1;
innerSubscription.request(1L);
 }

@Override
public void onNext(List<String> logs) {
    Subscription.request(1L);

///   Do some logic here

}

Я хочу знать, как мне справиться с обратным давлением, чтобы избежать этого исключения? Это исключение из-за метода ".buffer" Есть ли способ проверить состояние этих буферов. Также почему, даже если я увеличу размер rx2.buffer-size, я все равно получу исключение за то же время. В идеале система должна работать дольше с большим размером буфера, если исключение вызвано переполнением буфера.

Любая помощь по причине этого сообщения «Не удалось создать буфер из-за отсутствия запросов в» будет отличной.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
3
0
1 505
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Дело в том, почему вы используете предмет, который не знает обратного давления? Ты используешь его как автобус для бедняков? Кроме того, предполагая, что e.getContents() является простым геттером, я считаю, что вы можете заменить весь этот блок

.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() { ... });

с участием

.flatMapIterable(e -> e.getContents())
.buffer(3,TimeUnit.MINUTES,50000)
.rebatchRequests(1)
.observeOn(Schedulers.computation())
.doOnNext(s -> /* Do some logic here */)
.subscribe();

Привет, Тассос! Большое спасибо за изучение этого вопроса. Я очень взволнован этим предложением и скоро попробую его. У меня есть несколько вопросов по вашему ответу, прошу вас их уточнить. 1. Вы предлагаете не использовать поток, так как я не могу контролировать свой апстрим? 2. Я читал о rebatchRequest(n), и он говорит о 75% n в последующем запросе. но когда вы говорите rebatchRequest(1), что в этом случае будет 75%. Или он будет учитывать 75% наших событий в буфере (что составляет 50000 или 3 минуты) 3. Любая конкретная причина, по которой метод onError не переопределяется.

user2179923 16.05.2019 19:13

1. rebatchRequests будет сигнализировать восходящему потоку о необходимости отправки одного элемента за раз. Однако выше по течению находится buffer, что означает, что элементы представляют собой коллекции до 50000 исходных элементов. 2. 75% в данном случае равно 0; это означает, что он запросит следующий буфер из 50000 после того, как получит его. 3. Это по-прежнему означает, что ваш апстрим генерирует элементы быстрее, чем вы можете их обработать. Можете ли вы запустить параллельные экземпляры сегмента Do some logic here?

Tassos Bassoukos 16.05.2019 23:21

Другие вопросы по теме