Я использую RxJava2 Flowables, подписавшись на поток событий из PublishSubject. Он используется в приложении корпоративного уровня, и у нас нет возможности отбрасывать какие-либо события. Я использую версию RxJava 2.2.8.
Я использую BackpressureStrategy.BUFFER, так как не хочу терять ни одно из своих событий.
Кроме того, я снова буферизуюсь на 50000 или 3 минуты, в зависимости от того, что наступит раньше. Это я делаю, так как хочу консолидировать события и потом их обрабатывать.
Но я получаю следующие ошибки через несколько минут моего запуска
io.reactivex.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
at io.reactivex.internal.subscribers.QueueDrainSubscriber.fastPathOrderedEmitMax(QueueDrainSubscriber.java:121)
at io.reactivex.internal.operators.flowable.FlowableBufferTimed$BufferExactBoundedSubscriber.run(FlowableBufferTimed.java:569)
at io.reactivex.Scheduler$Worker$PeriodicTask.run(Scheduler.java:479)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
Я попытался увеличить размер буфера путем настройки, но поведение не изменилось.
System.setProperty("rx2.buffer-size", "524288");
Кроме того, если я буферизуюсь в течение более длительного времени вместо 3 минут, я получаю исключение после гораздо более длительного времени, вероятно, потому, что мой нисходящий поток работает лучше, когда события консолидируются больше. Однако у меня нет такого выбора, потому что это живые события и их нужно обрабатывать немедленно (через 3-5 минут).
Я также пробовал thread.sleep() перед вызовом «subscription.next» в случае ошибки, но все равно получал те же результаты.
keySubject.hide()
.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() {
@Override
public void onSubscribe(Subscription var1) {
innerSubscription = var1;
innerSubscription.request(1L);
}
@Override
public void onNext(List<String> logs) {
Subscription.request(1L);
/// Do some logic here
}
Я хочу знать, как мне справиться с обратным давлением, чтобы избежать этого исключения? Это исключение из-за метода ".buffer" Есть ли способ проверить состояние этих буферов. Также почему, даже если я увеличу размер rx2.buffer-size, я все равно получу исключение за то же время. В идеале система должна работать дольше с большим размером буфера, если исключение вызвано переполнением буфера.
Любая помощь по причине этого сообщения «Не удалось создать буфер из-за отсутствия запросов в» будет отличной.




Дело в том, почему вы используете предмет, который не знает обратного давления? Ты используешь его как автобус для бедняков? Кроме того, предполагая, что e.getContents() является простым геттером, я считаю, что вы можете заменить весь этот блок
.toFlowable(BackpressureStrategy.BUFFER)
.parallel()
.runOn(Schedulers.computation())
.map(e -> e.getContents())
.flatMap(s -> Flowable.fromIterable(s))
.sequential()
.buffer(3,TimeUnit.MINUTES,50000)
.subscribe(new Subscriber<List<String>>() { ... });
с участием
.flatMapIterable(e -> e.getContents())
.buffer(3,TimeUnit.MINUTES,50000)
.rebatchRequests(1)
.observeOn(Schedulers.computation())
.doOnNext(s -> /* Do some logic here */)
.subscribe();
1. rebatchRequests будет сигнализировать восходящему потоку о необходимости отправки одного элемента за раз. Однако выше по течению находится buffer, что означает, что элементы представляют собой коллекции до 50000 исходных элементов. 2. 75% в данном случае равно 0; это означает, что он запросит следующий буфер из 50000 после того, как получит его. 3. Это по-прежнему означает, что ваш апстрим генерирует элементы быстрее, чем вы можете их обработать. Можете ли вы запустить параллельные экземпляры сегмента Do some logic here?
Привет, Тассос! Большое спасибо за изучение этого вопроса. Я очень взволнован этим предложением и скоро попробую его. У меня есть несколько вопросов по вашему ответу, прошу вас их уточнить. 1. Вы предлагаете не использовать поток, так как я не могу контролировать свой апстрим? 2. Я читал о rebatchRequest(n), и он говорит о 75% n в последующем запросе. но когда вы говорите rebatchRequest(1), что в этом случае будет 75%. Или он будет учитывать 75% наших событий в буфере (что составляет 50000 или 3 минуты) 3. Любая конкретная причина, по которой метод onError не переопределяется.