Неужели нет официального руководства по обновлению реактора проекта с v2 до v3?
Я искал повсюду, даже используя waybackmachine, но мне не удалось найти документацию по пути обновления с v2 до v3 (с v2.0.6.RELEASE до v3.5.15, если быть более точным).
Я понимаю, что немного опоздал с этим вопросом, учитывая, что первая v3 вышла где-то в 2017 году, но текущая версия все еще v3, так что, думаю, еще не поздно, верно? ?
Лучший ресурс, который я нашел на данный момент, — это комментарий к проблеме GH, но у меня есть еще несколько вопросов.
Конкретно, что я использую в v3 вместо этих v2 классов:
reactor.core.Dispatcherreactor.core.dispatch.SynchronousDispatcherreactor.core.dispatch.RingBufferDispatcherreactor.core.processor.RingBufferProcessor (в комментарии, на который я дал ссылку выше, написано reactor.core.publisher.TopicProcessor, но с тех пор он тоже был удален)reactor.rx.action.Actionreactor.rx.stream.GroupedStream (Я предполагаю reactor.core.publisher.GroupedFlux, поскольку Stream -> Flux, но что мне использовать вместо GroupedStream.lift?Спасибо.




Хорошо, позвольте мне ответить на каждый пункт моего вопроса отдельным ответом (для удобства чтения) в надежде, что это может помочь другим в будущем.
Dispatcher -> SchedulerSynchronousDispatcher -> Schedulers.immediate()Старый:
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;
import reactor.core.Dispatcher;
import reactor.core.dispatch.SynchronousDispatcher;
public final class ReactiveEventRouter {
// dropping unimportant, unchanged parts and field modifiers for brevity
SynchronousDispatcher syncDispatcher = new SynchronousDispatcher();
CopyOnWriteArrayList<Subscription<?>> subscriptions = new CopyOnWriteArrayList<>();
reactor.fn.Consumer<Throwable> errorConsumer = (th) -> log.error("Error while processing event", th);
public <E> void tryRoute(final Object event, Subscription<E>
subscription, Dispatcher dispatcher) {
if (subscription.test(event)) {
dispatcher.dispatch(event, subscription::accept, errorConsumer);
}
}
public <E> void tryRoute(final Object event, Subscription<E> subscription) {
tryRoute(event, subscription, syncDispatcher);
}
public void route(final Object event) {
route(event, syncDispatcher);
}
public void route(final Object event, Dispatcher dispatcher) {
subscriptions.forEach(subscription -> {
if (subscription.test(event)) {
dispatcher.dispatch(event, subscription::accept, errorConsumer);
}
});
}
}
Новый:
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
public final class ReactiveEventRouter {
Scheduler immediateScheduler = Schedulers.immediate();
CopyOnWriteArrayList<Subscription<?>> subscriptions = new CopyOnWriteArrayList<>();
Consumer<Throwable> errorConsumer = (th) -> log.error("Error while processing event", th);
public <E> void tryRoute(final Object event, Subscription<E> subscription, Scheduler scheduler) {
if (subscription.test(event)) {
Mono.just(event)
.subscribeOn(scheduler)
.doOnError(errorConsumer)
.doOnNext(subscription::accept)
.subscribe();
}
}
public <E> void tryRoute(final Object event, Subscription<E> subscription) {
tryRoute(event, subscription, immediateScheduler);
}
public void route(final Object event) {
route(event, immediateScheduler);
}
public void route(final Object event, Scheduler scheduler) {
subscriptions.forEach(subscription -> {
if (subscription.test(event)) {
Mono.just(event)
.subscribeOn(scheduler)
.doOnError(errorConsumer)
.doOnNext(subscription::accept)
.subscribe();
}
});
}
}
К вашему сведению: простой внутренний класс ReactiveEventRouter.Subscription не изменился, но я включил его сюда, чтобы избежать путаницы с org.reactivestreams.Subscription:
public static class Subscription<E> {
final Predicate<Object> matcher;
final Consumer<E> consumer; // this is java.util.function.Consumer
Subscription(Predicate<Object> matcher, Consumer<E> consumer) {
this.matcher = matcher;
this.consumer = consumer;
}
boolean test(Object obj) {
return matcher.test(obj);
}
void accept(Object evt) {
consumer.accept((E) evt);
}
}
RingBufferDispatcher -> Schedulers.boundedElastic()Старый (фрагменты):
import reactor.core.dispatch.RingBufferDispatcher;
private final RingBufferDispatcher ringBufferDispatcher = new RingBufferDispatcher("reactive-event-engine");
public void forwardEvent(Object event) {
router.route(event, ringBufferDispatcher); // the router is ReactiveEventRouter from the answer which addressed items 1. and 2.
}
Новое (фрагменты):
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
private final Scheduler eventScheduler = Schedulers.boundedElastic();
public void forwardEvent(Object event) {
router.route(event, eventScheduler);
}
Я мог бы использовать TopicProcessor, поскольку он все еще здесь, в v3.5.15, но я не мог переварить использование устаревшего класса, который уже должен был быть удален в v3.5, поэтому я попробовал Sinks, но у меня не было времени глубоко вникать. изучить все аспекты Синкса.
RingBufferProcessor -> Sinks.ManyСтарый (фрагменты):
import reactor.core.processor.RingBufferProcessor;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.stream.GroupedStream;
// fields with modifiers removed
// uses com.lmax.disruptor.LiteBlockingWaitStrategy
RingBufferProcessor<REvt<E, R>> processor = RingBufferProcessor.create();
Function<GroupedStream<VGrp<R>, REvt<E, R>>, Stream<PRes>> stream;
Function<REvt<E, R>, VGrp<RULE>> makeGroup;
private void start() {
Streams.wrap(processor)
.groupBy(makeGroup::apply)
.flatMap(stream)
.observe(this::alertObserver)
.consume();
}
public void accept(REvt<E, R> evt) {
processor.accept(evt); // this calls org.reactivestreams.Subscriber#onNext
}
Новое (фрагменты):
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Sinks;
// fields with modifiers removed
Sinks.Many<REvt<E, R>> eventSink = Sinks.many().multicast().onBackpressureBuffer();
Function<GroupedFlux<VGrp<R>, REvt<E, R>>, Flux<PRes>> stream;
Function<REvt<E, R>, VGrp<RULE>> makeGroup;
private void start() {
eventSink.asFlux()
.groupBy(makeGroup)
.flatMap(stream)
.doOnNext(this::alertObserver)
.subscribe();
}
// this is the part I'm most skeptical of
// how else would you retry failed events?
// my thinking was: given that the whole pipeline is not fully event driven
// i.e. asynchronous, it might make sense to block in some place indefinitely
// until the roadblock is cleared
public void accept(REvt<E, R> evt) {
while (eventSink.tryEmitNext(evt).isFailure()) {
try {
// is busy-waiting for .5s like this before retrying OK?
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
Action
Вместо каждого Action я реализовал кастомный FluxOperator. Я не привожу примеров кода, поскольку интерфейс очень похож и поэтому достаточно прост.
Stream -> FluxGroupedStream -> GroupedFlux.
Где бы ни вызывался метод lift, теперь мы вызываем transform:
Старый:
// from is a GroupedStream
from.lift(() -> new CancellingAction<>(reactEvt -> ...))
Новый:
// from is a GroupedFlux
from.transform(flux -> new CancellingOperator<>(flux, reactEvt -> ...))