Я работаю над проектом с использованием rxjava1, и у меня есть цепочка Observable, которая иногда будет содержать тысячи наблюдаемых, объединенных или соединенных вместе. Когда это произойдет, произойдет исключение StackOverflow, и мы получим что-то вроде этого:
java.lang.StackOverflowError
at java.util.HashMap.putVal(HashMap.java:631)
at java.util.HashMap.put(HashMap.java:612)
at rx.internal.operators.OnSubscribeToMap$ToMapSubscriber.onNext(OnSubscribeToMap.java:127)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
И трассировка стека будет продолжаться для сотен строк. Единственный связанный пост, который я видел об этом, - это проблема в github: https://github.com/ReactiveX/RxJava/issues/3035. Но предлагаемое решение добавления наблюдаемых в список - это то, что мы использовали и не работает.
Что я могу сделать, чтобы предотвратить эти исключения StackOverflow? Мне нужно сделать какое-то дросселирование или противодавление?
Вот пример того, как выглядит текущий код и вызывает переполнение стека:
public Observable<Map<String, JsonObject>> extractTopLevelSummariesFromForms(JsonArray summaries, Func2<String, String, Observable<JsonObject>> summaryGatherer) {
List<Observable<JsonObject>> summaryObservables = new LinkedList<>();
summaries.stream()
.map(JsonUtil::safeJsonObject)
.filter(summary -> StringUtils.isNotEmpty(summary.getString(NAME))|| StringUtils.isNotEmpty(summary.getString(Form.TITLE)))
.forEach(summary -> {
if (StringUtils.isNotEmpty(summary.getString(TEXT)))
summaryObservables.add(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".hidden",
summary.getString(VALUE), summaryGatherer));
if (StringUtils.isNotEmpty(summary.getString(Form.TEXT)))
summaryObservables.add(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".title",
summary.getString(Summary.VALUE), summaryGatherer, true));
});
return Observable.merge(Observable.from(summaryObservables))
.filter(summaryResult -> summaryResult != null)
.toMap(summaryResult -> summaryResult.getString(KEY), summaryResult -> summaryResult.getJsonObject(TEXT));
}
private Observable<JsonObject> gatherSummariesFromElement(String parentName, String parentType, String elementName, String summaryValue, Func2<String, String, Observable<JsonObject>> summaryGatherer, Set<String> visited, boolean isList) {
if (visited.contains(elementName))
return Observable.just(null);
visited.add(elementName);
Map<String, JsonObject> summariesMap = new HashMap<>();
summariesMap.put(elementName, new JsonObject().put(Summary.SummaryValues.FORM, form).put(SUMMARY_TYPE, parentType));
Set<String> variables = TextEngine.getVariables(summariesMap);
Observable<JsonObject> elementSummaryObservable = Observable.just(getSummaryEntry(elementName, form, parentType, isList));
if (variables != null && !variables.isEmpty()) {
elementSummaryObservable = elementSummaryObservable.mergeWith(Observable.from(variables).flatMap(variable -> {
if (StringUtils.contains(variable, ".") && StringUtils.equals(parentName, StringUtils.split(variable, ".")[0]))
return Observable.just(null);
else
return summaryGatherer.call(parentName, variable).flatMap(variableEntry -> {
if (variableEntry == null)
return Observable.just(null);
else
return gatherSummariesFromElement(parentName, variableEntry.getString(SOURCE_TYPE), variable, variableEntry.getString(FORM), summaryGatherer, visited, variableEntry.getBoolean(Summary.SummaryValues.IS_LIST, false));
});
}));
}
return elementSummaryObservable;
}
Я пробовал запускать все в планировщике Schedulers.computation(), кроме сетевых запросов, они выполняются в планировщиках Schedulers.io(), и я все еще получаю переполнение стека:
Exception in thread "pool-26-thread-2" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.StackOverflowError
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
@DaveMoten В проблеме с github говорится, что нужно использовать список наблюдаемых или выполнить слияние с PublishSubject. Мы составили список наблюдаемых, а затем объединили их, и вот как мы получили ошибку stackoverflow. Другой вариант, с PublishSubject, я не думаю, что соответствует нашим потребностям, потому что PublishSubject делает только это: Subject that, once an Observer has subscribed, emits all subsequently observed items to the subscriber.
Это не говорит список!
тогда помогите мне понять, что вы видите, потому что я вижу в этом обходной путь, к которому относится проблема с github: List<Observable<String>> observables = new ArrayList<>(); for (int i = 0; i < 1000; i++) { observables.add(Observable.just("my string")); } Observable.merge(observables).last().toBlocking().first();
Проблема говорит вам использовать merge(Observable<Observable>), а не merge(List<Observable>), который представляет собой другую перегрузку слияния. Попробуйте merge(Observable.fromIterable(observables))
Пожалуйста, предоставьте то, что вы пробовали для обходного пути, потому что я не могу представить, что обходной путь в проблеме 3035 все равно не удастся для вас, если у вас все еще нет глубоко вложенных цепочек. Также убедитесь, что вы изменили все места с неправильным шаблоном mergeWith.
@DaveMoten: добавлено несколько примеров
@akarnokd: добавлено несколько примеров
У вас есть рекурсия с gatherSummariesFromElement, которая все еще может создавать глубокие цепочки. Используйте .subscribeOn(Schedulers.single()), чтобы разбить стек при рекурсии.
Единственные планировщики, которые у меня есть: computation, from, immediate, io, newThread, test, trampoline, reset, shutdown, start.




gatherSummariesFromElement, которые, возможно, очень глубокоObservable.just(null) с последующей фильтрацией нулевых значений вы можете использовать Observable.empty().summaryObservables выглядит излишним. Вы можете составить список действительных сводок вместо, а затем обработать их в flatMapgatherSummariesFromElement на создание рекурсивного списка элементов, а затем создайте наблюдаемый объект из этого списка._
public Observable<Map<String, JsonObject>> extractTopLevelSummariesFromForms(JsonArray summaries, Func2<String, String, JsonObject> summaryGatherer) {
List<JsonObject> validSummaries = new LinkedList<>();
summaries.stream()
.map(JsonUtil::safeJsonObject)
.filter(summary -> StringUtils.isNotEmpty(summary.getString(NAME)) || StringUtils.isNotEmpty(summary.getString(Form.TITLE)))
.forEach(validSummaries::add);
Set<String> visited = new HashSet<>();
return Observable.from(validSummaries)
.flatMap(summary -> {
if (StringUtils.isNotEmpty(summary.getString(TEXT)))
Observable.from(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".hidden",
summary.getString(VALUE), visited, summaryGatherer)));
if (StringUtils.isNotEmpty(summary.getString(Form.TEXT)))
Observable.from(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".title",
summary.getString(Summary.VALUE), summaryGatherer, visited,true)));
})
.toMap(summaryResult -> summaryResult.getString(KEY), summaryResult -> summaryResult.getJsonObject(TEXT));
}
private List<JsonObject> gatherSummariesFromElement(String parentName, String parentType, String elementName, String summaryValue, Func2<String, String, JsonObject> summaryGatherer, Set<String> visited, boolean isList) {
if (visited.contains(elementName))
return Collections.emptyList();
visited.add(elementName);
List<JsonObject> result = new ArrayList<>()
Map<String, JsonObject> summariesMap = new HashMap<>();
summariesMap.put(elementName, new JsonObject().put(Summary.SummaryValues.FORM, form).put(SUMMARY_TYPE, parentType));
Set<String> variables = TextEngine.getVariables(summariesMap);
result.add(getSummaryEntry(elementName, form, parentType, isList));
if (variables != null && !variables.isEmpty()) {
for (String variable : variables) {
if (StringUtils.contains(variable, ".") && StringUtils.equals(parentName, StringUtils.split(variable, ".")[0])) {
// do nothing
} else {
JsonObject variableEntry = summaryGatherer.call(parentName, variable)
if (variableEntry != null) {
result.addAll(gatherSummariesFromElement(parentName, variableEntry.getString(SOURCE_TYPE), variable, variableEntry.getString(FORM), summaryGatherer, visited, variableEntry.getBoolean(Summary.SummaryValues.IS_LIST, false));
}
}
}
}
return result;
}
Чтобы решить эту проблему, используйте
merge(Observable<Observable<..>>)илиPublishSubject. Что именно вы пробовали (код, пожалуйста)?