Обновление реактора проекта с v2 до v3?

Неужели нет официального руководства по обновлению реактора проекта с v2 до v3?

Я искал повсюду, даже используя waybackmachine, но мне не удалось найти документацию по пути обновления с v2 до v3v2.0.6.RELEASE до v3.5.15, если быть более точным).

Я понимаю, что немного опоздал с этим вопросом, учитывая, что первая v3 вышла где-то в 2017 году, но текущая версия все еще v3, так что, думаю, еще не поздно, верно? ?

Лучший ресурс, который я нашел на данный момент, — это комментарий к проблеме GH, но у меня есть еще несколько вопросов.

Конкретно, что я использую в v3 вместо этих v2 классов:

  1. reactor.core.Dispatcher
  2. reactor.core.dispatch.SynchronousDispatcher
  3. reactor.core.dispatch.RingBufferDispatcher
  4. reactor.core.processor.RingBufferProcessor (в комментарии, на который я дал ссылку выше, написано reactor.core.publisher.TopicProcessor, но с тех пор он тоже был удален)
  5. reactor.rx.action.Action
  6. reactor.rx.stream.GroupedStream (Я предполагаю reactor.core.publisher.GroupedFlux, поскольку Stream -> Flux, но что мне использовать вместо GroupedStream.lift?

Спасибо.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
2
0
59
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Ответ принят как подходящий

Хорошо, позвольте мне ответить на каждый пункт моего вопроса отдельным ответом (для удобства чтения) в надежде, что это может помочь другим в будущем.

  1. Dispatcher -> Scheduler
  2. SynchronousDispatcher -> Schedulers.immediate()

Старый:

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;

import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;

public final class ReactiveEventRouter {
  // dropping unimportant, unchanged parts and field modifiers for brevity
  SynchronousDispatcher syncDispatcher = new SynchronousDispatcher();
  CopyOnWriteArrayList<Subscription<?>> subscriptions = new CopyOnWriteArrayList<>();
  reactor.fn.Consumer<Throwable> errorConsumer = (th) -> log.error("Error while processing event", th);

  public <E> void tryRoute(final Object event, Subscription<E> 
subscription, Dispatcher dispatcher) {
    if (subscription.test(event)) {
      dispatcher.dispatch(event, subscription::accept, errorConsumer);
    }
  }

  public <E> void tryRoute(final Object event, Subscription<E> subscription) {
    tryRoute(event, subscription, syncDispatcher);
  }

  public void route(final Object event) {
    route(event, syncDispatcher);
  }

  public void route(final Object event, Dispatcher dispatcher) {
    subscriptions.forEach(subscription -> {
      if (subscription.test(event)) {
        dispatcher.dispatch(event, subscription::accept, errorConsumer);
      }
    });
  }
}

Новый:

import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;

import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public final class ReactiveEventRouter {
  Scheduler immediateScheduler = Schedulers.immediate();
  CopyOnWriteArrayList<Subscription<?>> subscriptions = new CopyOnWriteArrayList<>();
  Consumer<Throwable> errorConsumer = (th) -> log.error("Error while processing event", th);

  public <E> void tryRoute(final Object event, Subscription<E> subscription, Scheduler scheduler) {
    if (subscription.test(event)) {
      Mono.just(event)
          .subscribeOn(scheduler)
          .doOnError(errorConsumer)
          .doOnNext(subscription::accept)
          .subscribe();
    }
  }

  public <E> void tryRoute(final Object event, Subscription<E> subscription) {
    tryRoute(event, subscription, immediateScheduler);
  }

  public void route(final Object event) {
    route(event, immediateScheduler);
  }

  public void route(final Object event, Scheduler scheduler) {
    subscriptions.forEach(subscription -> {
      if (subscription.test(event)) {
        Mono.just(event)
            .subscribeOn(scheduler)
            .doOnError(errorConsumer)
            .doOnNext(subscription::accept)
            .subscribe();
      }
    });
  }
}

К вашему сведению: простой внутренний класс ReactiveEventRouter.Subscription не изменился, но я включил его сюда, чтобы избежать путаницы с org.reactivestreams.Subscription:

public static class Subscription<E> {
  final Predicate<Object> matcher;
  final Consumer<E> consumer; // this is java.util.function.Consumer

  Subscription(Predicate<Object> matcher, Consumer<E> consumer) {
    this.matcher = matcher;
    this.consumer = consumer;
  }

  boolean test(Object obj) {
    return matcher.test(obj);
  }

  void accept(Object evt) {
    consumer.accept((E) evt);
  }
}
  1. RingBufferDispatcher -> Schedulers.boundedElastic()

Старый (фрагменты):

import reactor.core.dispatch.RingBufferDispatcher;

private final RingBufferDispatcher ringBufferDispatcher = new RingBufferDispatcher("reactive-event-engine");

public void forwardEvent(Object event) {
  router.route(event, ringBufferDispatcher); // the router is ReactiveEventRouter from the answer which addressed items 1. and 2.
}

Новое (фрагменты):

import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

private final Scheduler eventScheduler = Schedulers.boundedElastic();

public void forwardEvent(Object event) {
  router.route(event, eventScheduler);
}

Я мог бы использовать TopicProcessor, поскольку он все еще здесь, в v3.5.15, но я не мог переварить использование устаревшего класса, который уже должен был быть удален в v3.5, поэтому я попробовал Sinks, но у меня не было времени глубоко вникать. изучить все аспекты Синкса.

  1. RingBufferProcessor -> Sinks.Many

Старый (фрагменты):

import reactor.core.processor.RingBufferProcessor;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.stream.GroupedStream;

// fields with modifiers removed
// uses com.lmax.disruptor.LiteBlockingWaitStrategy
RingBufferProcessor<REvt<E, R>> processor = RingBufferProcessor.create();
Function<GroupedStream<VGrp<R>, REvt<E, R>>, Stream<PRes>> stream;
Function<REvt<E, R>, VGrp<RULE>> makeGroup;

private void start() {
  Streams.wrap(processor)
      .groupBy(makeGroup::apply)
      .flatMap(stream)
      .observe(this::alertObserver)
      .consume();
}

public void accept(REvt<E, R> evt) {
  processor.accept(evt); // this calls org.reactivestreams.Subscriber#onNext
}

Новое (фрагменты):

import java.util.function.Function;

import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Sinks;

// fields with modifiers removed
Sinks.Many<REvt<E, R>> eventSink = Sinks.many().multicast().onBackpressureBuffer();
Function<GroupedFlux<VGrp<R>, REvt<E, R>>, Flux<PRes>> stream;
Function<REvt<E, R>, VGrp<RULE>> makeGroup;

private void start() {
  eventSink.asFlux()
      .groupBy(makeGroup)
      .flatMap(stream)
      .doOnNext(this::alertObserver)
      .subscribe();
}

// this is the part I'm most skeptical of
// how else would you retry failed events?
// my thinking was: given that the whole pipeline is not fully event driven
// i.e. asynchronous, it might make sense to block in some place indefinitely
// until the roadblock is cleared
public void accept(REvt<E, R> evt) {
  while (eventSink.tryEmitNext(evt).isFailure()) {
    try {
      // is busy-waiting for .5s like this before retrying OK?
      Thread.sleep(500);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      throw new RuntimeException(e);
    }
  }
}
  1. Action
    Вместо каждого Action я реализовал кастомный FluxOperator. Я не привожу примеров кода, поскольку интерфейс очень похож и поэтому достаточно прост.

  2. Stream -> Flux
    GroupedStream -> GroupedFlux.

Где бы ни вызывался метод lift, теперь мы вызываем transform:

Старый:

// from is a GroupedStream
from.lift(() -> new CancellingAction<>(reactEvt -> ...))

Новый:

// from is a GroupedFlux
from.transform(flux -> new CancellingOperator<>(flux, reactEvt -> ...))

Другие вопросы по теме