Странное исключение rejectedexecutionexception в кодах reactor

У меня вопрос по поводу странного исключения.

Я получил несколько исключений RejectedExecutionException на продакшене (около 200 в день). Reactor запускается в настраиваемом планировщике с использованием Schedulers.fromExecutorService ().

Итак, я сначала проверил размер очереди или что-то еще у ExcutorService, но все в порядке. нет полной очереди. выключения нет.

Это мой код, который вызвал исключение.

return reactionRepository
        .getPage(context, scanQuery)
        .buffer(100)
        .concatMap(Flux::fromIterable)
        .flatMapSequential(likeSn -> findOne(context, parentId, likeSn)
                .transform(ReactiveHelpers.defaultIfNotFoundOrError(Optional.empty())))
        .filter(Optional::isPresent)
        .map(Optional::get)
        .doOnError(e -> log.error("Failed to find likes", e));

getPage () возвращает объект Flux. Следующие коды являются основным кодом для чтения информации из кластеров Redis.

...
return bucketList.publishOn(redisScheduler)
                   .filter(val -> val.getScore() >= 0)
                   .map(Value::getValue)

И это журнал исключений, который у меня есть в моем файле журнала. этот журнал ошибок был записан в приведенной выше строке .doOnError(e -> log.error("Failed to find likes", e));.

reactor.core.Exceptions$ReactorRejectedExecutionException: Scheduler unavailable
    at reactor.core.Exceptions.failWithRejected(Exceptions.java:249)
    at reactor.core.publisher.Operators.onRejectedExecution(Operators.java:412)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:293)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.request(FluxPublishOn.java:261)
    at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.request(FluxBuffer.java:111)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:227)
    at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onSubscribe(FluxBuffer.java:125)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onSubscribe(FluxPublishOn.java:209)
    at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:78)
    at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:108)
    at reactor.core.publisher.FluxBuffer.subscribe(FluxBuffer.java:72)
    at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)
    at reactor.core.publisher.Flux.subscribe(Flux.java:6877)
    at reactor.core.publisher.FluxMergeSequential.subscribe(FluxMergeSequential.java:99)
    at reactor.core.publisher.FluxFilter.subscribe(FluxFilter.java:52)
    at reactor.core.publisher.FluxMap.subscribe(FluxMap.java:62)
    at reactor.core.publisher.FluxPeek.subscribe(FluxPeek.java:83)
    at reactor.core.publisher.MonoCollectList.subscribe(MonoCollectList.java:59)
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
    at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
    at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128)
    at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:372)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630)
    at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696)
    at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
    at reactor.core.publisher.Flux.subscribe(Flux.java:6877)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:372)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201)
    at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:244)
    at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:202)
    at reactor.core.publisher.FluxGroupBy$GroupByMain.onSubscribe(FluxGroupBy.java:165)
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:140)
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:64)
    at reactor.core.publisher.FluxGroupBy.subscribe(FluxGroupBy.java:82)
    at reactor.core.publisher.FluxFlatMap.subscribe(FluxFlatMap.java:97)
    at reactor.core.publisher.MonoCollect.subscribe(MonoCollect.java:66)
    at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
    at reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
    at reactor.core.publisher.MonoSubscribeOn$SubscribeOnSubscriber.run(MonoSubscribeOn.java:123)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at io.micrometer.core.instrument.AbstractTimer.recordCallable(AbstractTimer.java:143)
    at io.micrometer.core.instrument.Timer.lambda$wrap$1(Timer.java:137)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.RejectedExecutionException: Scheduler unavailable
    at reactor.core.Exceptions.<clinit>(Exceptions.java:502)
    at reactor.core.publisher.Operators.onOperatorError(Operators.java:345)
    at reactor.core.publisher.Operators.onOperatorError(Operators.java:323)
    at reactor.core.publisher.Operators.onOperatorError(Operators.java:305)
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3080)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:78)
    at reactor.core.publisher.Operators.complete(Operators.java:128)
    at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3080)

Я не знаю, почему возникает это исключение.

В любом случае, при отладке этой проблемы я видел случай выброса исключения только один раз. (Я не уверен, что это настоящая причина.)

В ExecutorSchedulerWorker::schedule оператор !tasks.add(r) оценивается как истина, поэтому возникает исключение.

ExecutorTrackedRunnable r = new ExecutorTrackedRunnable(task, this, true);
        if (!tasks.add(r)) {
            throw Exceptions.failWithRejected();
        }

Это ключ к разгадке, который у меня есть только сейчас.

Кто-нибудь знает эту проблему? Любые предложения могут мне помочь.

Редактировать 1. Добавьте указанный код. Это вспомогательный код для обработки моего пользовательского исключения

public static <T> Function<Mono<T>, Publisher<T>> defaultIfNotFoundOrError(T defaultValue) {
    return source -> source.onErrorResume(ReactionStorageException.class,
                                          e -> {
                                              if (e.getErrorCode() == ReactionStorageErrorCode.NOT_FOUND) {
                                                   return Mono.just(defaultValue);
                                              } else {
                                                   return Mono.error(e);
                                              }
                                          });
}

И findOne ()

public Mono<Optional<Like>> findOne(final RequesterContext context,
                                    final String parentId,
                                    final int sn,
                                    final boolean handleFaulted) {
    Preconditions.checkArgument(!Strings.isNullOrEmpty(parentId),
                                "parentId must not be an empty value");
    Preconditions.checkArgument(sn >= StorageConstants.BASE_SN,
                                "Serial number must be greater than BASE_SN value");

    final String likeInfoKey = RedisKeys.reactionInfo(reactionType, parentId, sn);

    return cmds.hgetall(likeInfoKey)
               .publishOn(redisScheduler)
               .flatMap(m -> Mono.justOrEmpty(LikeRedisMapper.from(m)))
               .switchIfEmpty(ReactiveHelpers.mapOrEmpty(handleFaulted,
                                                         requestFaultedLike(context, parentId, sn)))
               .switchIfEmpty(ExceptionUtils.generate(ReactionStorageErrorCode.NOT_FOUND,
                                                      "Like(%s, %d) cannot be found",
                                                      parentId, sn))
               .map(Optional::ofNullable)
               .doOnError(e -> log.trace("Failed to find a like", e));
}

Обновление 2. После глубокой отладки возникло исключение из-за отмены источника. Эта отмена вызвана Mono.zip (A, B, C ...). Выше источник - B. Если A - пустой источник, B следует отменить. но иногда запрос B обрабатывается после того, как был получен сигнал отмены.

Не могли бы вы указать точную версию reactor? Это помогает искать источники по трассировке стека.

Oleg Kurbatov 26.10.2018 12:28

@OlegKurbatov использую версию 3.1.8

Lee GiTack 26.10.2018 13:48

Так вот он return Mono.error(e);. Отлаживайте это место, чтобы понять, какое исключение происходит, или перепишите все, как я предлагал обрабатывать все исключения сразу.

Oleg Kurbatov 26.10.2018 13:53

Эта логика хорошо работает в большинстве случаев, включая случай исключения в моем модульном тесте. В среде разработки я не могу найти никаких исключений, как указано выше. У вашего предложения нет проблем, если ошибка может быть устранена элегантным способом. Но я хочу знать настоящую причину RejectedExecutionException. Иногда я получал исключение с почти таким же параметром. Мне любопытно, почему это исключение.

Lee GiTack 26.10.2018 14:08

Вы всегда можете записать его в журнал перед тем, как выбросить.

Oleg Kurbatov 26.10.2018 14:11

Я заметил, что код содержит много ссылок на Optional. Я думаю, что в этом случае было бы намного чище придерживаться реактора и его Mono.

Oleg Kurbatov 26.10.2018 14:13

Спасибо вам за ваши предложения. Я буду переосмысливать логику и рефакторинг. Но все равно

Lee GiTack 26.10.2018 14:19

Но я все еще хочу знать, почему я иногда получал RejectedExecutionException.

Lee GiTack 26.10.2018 14:25

Это может произойти после неперехваченного исключения в потоке redisScheduler, которое не регистрируется. После очистки кода и избавления от одних и тех же сообщений журнала в разных местах (например, «Не удалось найти лайки») и установки обработчика неперехваченных исключений для основного потока (Thread.setDefaultUncaughtExceptionHandler (...)) вы обязательно найдете точное место и вид первопричины.

Oleg Kurbatov 26.10.2018 14:36

ммм, я попробую. Спасибо!

Lee GiTack 26.10.2018 14:40
4
10
1 994
0

Другие вопросы по теме