«Вы считаете обязательным, что первая строка будет выполнена, а затем вторая, что не так в случае с webflux. Вы должны думать о событиях-обратных вызовах».
Я согласен с этой оценкой (у меня БОЛЬШОЙ опыт делать вещи «императивно») и надеюсь, что люди помогут мне с коррекцией руля того, как я рассматриваю пространство решений. Я публикую три разные версии одной и той же «функциональности», только одна из которых работает (и я открыт для комментариев о том, как эта версия может/должна быть изменена, чтобы лучше соответствовать реактивной/функциональной реализации).
Под руководством/помощью человека, проводившего цитируемую оценку, я смог заставить `DemoPOJOHandler.add(ServerRequest)' работать. Код вместе с выводом на уровне отладки показан ниже. Я отмечаю, что сразу после строк HTTP POST "/v2/DemoPOJO" и *"Mapped to mil.navy..." есть запись из реактор.netty.channel.FluxReceive с указанием "Подписка на входящий приемник..". Кажется, это ключевое действие, отсутствующее в моих двух других попытках.
Мой конкретный (хотя и «длинный») вопрос:
Я «понимаю», что мысль о том, что оператор № 1 будет выполнен, затем оператор № 2 и т. д., является «императивным» представлением пространства решений. Но в приведенном ниже примере это похоже на то, что происходит. Оператор регистратор выполняется в 08:38:34.217, за ним следует подписка в 08:38:34.251, которая затем создает экземпляр DemoPOJO в 08:39:34.267, после чего все "работает".
Но цепочка в последовательности request.bodyToMono()... не выглядит существенно отличной от цепочки методов в императивном коде (например, 'Целое.toString().indexOf()'), за исключением лямбда-выражений (или является ли наличие лямбда-выражений причиной того, что «все меняется»? ). Итак, если последовательность request.bodyToMono()... теоретически не требует «.then()» или «.switchIfEmpty()», то почему основная последовательность request.bodyToMono()... не выполняет «service.add(demoPOJO)»? Я понимаю, что Mono не подписывается, но почему кажется, что дополнительные операторы в цепочке необходимы для подписки и добавления POJO в репо?
Этот код успешно выполняется...
@Component
public class DemoPOJOHandler {
private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);
@Autowired
private DemoPOJOService service;
public Mono<ServerResponse> add(ServerRequest request) {
logger.debug("DemoPOJOHandler.add( ServerRequest )");
return request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> service.add(demoPOJO))
.then(ServerResponse.ok().build())
.switchIfEmpty(ServerResponse.badRequest()
.contentType(MediaType.APPLICATION_JSON)
.build());
}
}
2019-07-25 08:38:34.144 DEBUG 11992 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf : -Dio.netty.buffer.checkAccessible: true
2019-07-25 08:38:34.145 DEBUG 11992 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf : -Dio.netty.buffer.checkBounds: true
2019-07-25 08:38:34.145 DEBUG 11992 --- [ctor-http-nio-2] i.n.util.ResourceLeakDetectorFactory : Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@7a8a4d6a
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations : [id: 0xa2da3d98, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62644] New http connection, requesting read
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] New http connection, requesting read
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.channel.BootstrapHandlers : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-2] reactor.netty.channel.BootstrapHandlers : [id: 0xa2da3d98, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62644] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler : -Dio.netty.recycler.maxCapacityPerThread: 4096
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler : -Dio.netty.recycler.maxSharedCapacityFactor: 2
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler : -Dio.netty.recycler.linkCapacity: 16
2019-07-25 08:38:34.157 DEBUG 11992 --- [ctor-http-nio-3] io.netty.util.Recycler : -Dio.netty.recycler.ratio: 8
2019-07-25 08:38:34.173 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Increasing pending responses, now 1
2019-07-25 08:38:34.173 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Handler is being applied: org.springframework.http.server.reactive.ReactorHttpHandlerAdapter@579c20c6
2019-07-25 08:38:34.195 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter : [5f552130] HTTP POST "/v2/DemoPOJO"
2019-07-25 08:38:34.217 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.r.f.s.s.RouterFunctionMapping : [5f552130] Mapped to mil.navy.demo.DemoPOJO.DemoPOJORouter$$Lambda$258/1123559518@22a8277c
2019-07-25 08:38:34.217 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOHandler : DemoPOJOHandler.add( ServerRequest )
2019-07-25 08:38:34.251 DEBUG 11992 --- [ctor-http-nio-3] reactor.netty.channel.FluxReceive : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Subscribing inbound receiver [pending: 0, cancelled:false, inboundDone: false]
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO : DemoPOJO.DemoPOJO( 666, foo_666, 10666 )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO : DemoPOJO.toString()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] o.s.http.codec.json.Jackson2JsonDecoder : [5f552130] Decoded [666 :: foo_666 :: 10666]
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOService : DemoPOJOService.add( DemoPOJO )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJORepo : DemoPOJORepo.add( DemoPOJO )
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO : DemoPOJO.getId()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO : DemoPOJO.getId()
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJORepo : DemoPOJORepo.add( DemoPOJO ) -> adding for id 666
2019-07-25 08:38:34.267 DEBUG 11992 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJO : DemoPOJO.getId()
2019-07-25 08:38:34.272 DEBUG 11992 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter : [5f552130] Completed 200 OK
2019-07-25 08:38:34.273 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Last HTTP response frame
2019-07-25 08:38:34.273 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] No sendHeaders() called before complete, sending zero-length header
2019-07-25 08:38:34.274 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Decreasing pending responses, now 0
2019-07-25 08:38:34.275 DEBUG 11992 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x5f552130, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62645] Last HTTP packet was sent, terminating the channel
2019-07-25 08:38:41.720 DEBUG 11992 --- [169.254.211.161] sun.rmi.transport.tcp : RMI TCP Connection(4)-169.254.211.161: (port 62610) connection closed
2019-07-25 08:38:41.720 DEBUG 11992 --- [169.254.211.161] sun.rmi.transport.tcp : RMI TCP Connection(4)-169.254.211.161: close connection
Этот код выполняется «без ошибок», но никогда не подписывается и поэтому никогда не выполняет часть цепочки «doOnSuccess(...)». Но вроде как должно? Что «волшебного» в соединении отдельного оператора возврат... с оператором 'запрос.bodyToMono(...)' с помощью «.потом(...)»?
@Component
public class DemoPOJOHandler {
private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);
@Autowired
private DemoPOJOService service;
public Mono<ServerResponse> add(ServerRequest request) {
logger.debug("DemoPOJOHandler.add( ServerRequest )");
request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()));
return ServerResponse.ok().build();
}
}
2019-07-25 08:40:16.155 DEBUG 17064 --- [169.254.211.161] sun.rmi.transport.tcp : RMI TCP Connection(4)-169.254.211.161: (port 62661) connection closed
2019-07-25 08:40:16.155 DEBUG 17064 --- [169.254.211.161] sun.rmi.transport.tcp : RMI TCP Connection(4)-169.254.211.161: close connection
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf : -Dio.netty.buffer.checkAccessible: true
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] io.netty.buffer.AbstractByteBuf : -Dio.netty.buffer.checkBounds: true
2019-07-25 08:40:18.248 DEBUG 17064 --- [ctor-http-nio-2] i.n.util.ResourceLeakDetectorFactory : Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@3860465a
2019-07-25 08:40:18.266 DEBUG 17064 --- [ctor-http-nio-2] r.n.http.server.HttpServerOperations : [id: 0x768a1f21, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62695] New http connection, requesting read
2019-07-25 08:40:18.266 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] New http connection, requesting read
2019-07-25 08:40:18.267 DEBUG 17064 --- [ctor-http-nio-3] reactor.netty.channel.BootstrapHandlers : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:40:18.267 DEBUG 17064 --- [ctor-http-nio-2] reactor.netty.channel.BootstrapHandlers : [id: 0x768a1f21, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62695] Initialized pipeline DefaultChannelPipeline{(BootstrapHandlers$BootstrapInitializerHandler#0 = reactor.netty.channel.BootstrapHandlers$BootstrapInitializerHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler : -Dio.netty.recycler.maxCapacityPerThread: 4096
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler : -Dio.netty.recycler.maxSharedCapacityFactor: 2
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler : -Dio.netty.recycler.linkCapacity: 16
2019-07-25 08:40:18.273 DEBUG 17064 --- [ctor-http-nio-3] io.netty.util.Recycler : -Dio.netty.recycler.ratio: 8
2019-07-25 08:40:18.285 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Increasing pending responses, now 1
2019-07-25 08:40:18.289 DEBUG 17064 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Handler is being applied: org.springframework.http.server.reactive.ReactorHttpHandlerAdapter@7fa4fcbc
2019-07-25 08:40:18.297 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter : [51900c31] HTTP POST "/v2/DemoPOJO"
2019-07-25 08:40:18.315 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.r.f.s.s.RouterFunctionMapping : [51900c31] Mapped to mil.navy.demo.DemoPOJO.DemoPOJORouter$$Lambda$262/1446001495@27a07cfc
2019-07-25 08:40:18.316 DEBUG 17064 --- [ctor-http-nio-3] mil.navy.demo.DemoPOJO.DemoPOJOHandler : DemoPOJOHandler.add( ServerRequest )
2019-07-25 08:40:18.358 DEBUG 17064 --- [ctor-http-nio-3] o.s.w.s.adapter.HttpWebHandlerAdapter : [51900c31] Completed 200 OK
2019-07-25 08:40:18.359 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Last HTTP response frame
2019-07-25 08:40:18.359 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] No sendHeaders() called before complete, sending zero-length header
2019-07-25 08:40:18.360 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Decreasing pending responses, now 0
2019-07-25 08:40:18.361 DEBUG 17064 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] Last HTTP packet was sent, terminating the channel
2019-07-25 08:40:18.366 DEBUG 17064 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler : [id: 0x51900c31, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:62696] No ChannelOperation attached. Dropping:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 0a 20 20 20 20 22 69 64 22 3a 20 36 36 36 2c |{. "id": 666,|
|00000010| 0a 20 20 20 20 22 6e 61 6d 65 22 3a 20 22 66 6f |. "name": "fo|
|00000020| 6f 5f 36 36 36 22 2c 0a 20 20 20 20 22 76 61 6c |o_666",. "val|
|00000030| 75 65 22 3a 20 31 30 36 36 36 0a 7d |ue": 10666.} |
+--------+-------------------------------------------------+----------------+
Этот код просто взрывается NPE. Моя ошибочная логика была такова: «Ну, если «сделать при успехе (...)» не происходит, потому что Мононуклеоз не подписан, тогда «подпишитесь». Очевидно, это не было решением. Менее очевидно (для меня, на данный момент времени) , вот почему?".
@Component
public class DemoPOJOHandler {
private Logger logger = LoggerFactory.getLogger(DemoPOJOHandler.class);
@Autowired
private DemoPOJOService service;
public Mono<ServerResponse> add(ServerRequest request) {
logger.debug("DemoPOJOHandler.add( ServerRequest )");
request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()))
.subscribe();
return ServerResponse.ok().build();
}
}
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException
Caused by: java.lang.NullPointerException: null
at mil.navy.demo.DemoPOJO.DemoPOJOHandler.lambda$add$2(DemoPOJOHandler.java:73) ~[classes/:na]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:311) [reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1743) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:1743) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:155) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onComplete(FluxMapSignal.java:213) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:390) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:197) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:338) ~[reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:350) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:399) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.http.server.HttpServerOperations.cleanHandlerTerminate(HttpServerOperations.java:519) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
at reactor.netty.http.server.HttpTrafficHandler.operationComplete(HttpTrafficHandler.java:314) [reactor-netty-0.8.9.RELEASE.jar:0.8.9.RELEASE]
(... lots of stuff deleted to fit posting constraints ...)
2019-07-25 10:44:43.212 DEBUG 10544 --- [ctor-http-nio-3] r.n.channel.ChannelOperationsHandler : [id: 0xa62e89df, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:64515] No ChannelOperation attached. Dropping:
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 7b 0a 20 20 20 20 22 69 64 22 3a 20 36 36 36 2c |{. "id": 666,|
|00000010| 0a 20 20 20 20 22 6e 61 6d 65 22 3a 20 22 66 6f |. "name": "fo|
|00000020| 6f 5f 36 36 36 22 2c 0a 20 20 20 20 22 76 61 6c |o_666",. "val|
|00000030| 75 65 22 3a 20 31 30 36 36 36 0a 7d |ue": 10666.} |
+--------+-------------------------------------------------+----------------+
2019-07-25 10:45:08.112 DEBUG 10544 --- [169.254.211.161] sun.rmi.transport.tcp : RMI TCP Connection(3)-169.254.211.161: (port 64485) connection closed
2019-07-25 10:45:08.112 DEBUG 10544 --- [169.254.211.161] sun.rmi.transport.tcp : RMI TCP Connection(3)-169.254.211.161: close connection




Поскольку я был тем, кого вы процитировали вверху, я попытаюсь ответить на ваши вопросы.
Сначала нужно поговорить о «неблокировке». Что такое "неблокирующий"? ну неблокировка основана на событиях. Базовый сервер, Netty, не работает с назначением одного потока для каждого запроса, а вместо этого работает с цепочкой событий и очередями событий.
Итак, когда кто-то подписывается, netty создаст базовую очередь событий (вроде), которая в основном будет работать как:
x <- y <- z
Чтобы получить x, нам нужно разрешить y, а чтобы получить y, нам нужно разрешить z. Это то, что люди обычно называют «функциональной» частью в этом типе программирования.
Одна из самых распространенных ошибок, которую я вижу, когда люди начинают заниматься реактивным программированием, заключается в том, что они не понимают, что subscriber — это призвание client. Ваше весеннее приложение — это publisher, а каждый client вызов вашего сервиса — это subscriber.
Вы никогда не должны подписываться в своем приложении
зачем вашему издательскому приложению подписываться на себя? когда вы объясняете это таким образом, люди обычно понимают.
Итак, давайте посмотрим на ваши примеры, и я возьму их в обратном порядке:
Пример 3:
public Mono<ServerResponse> add(ServerRequest request) {
logger.debug("DemoPOJOHandler.add( ServerRequest )");
request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()))
.subscribe();
return ServerResponse.ok().build();
}
Здесь мы вводим метод императивным способом, мы даем ему запрос. ServerRequest - это конкретный объект, но как только вы это сделаете bodyToMono, вы вернете Mono<DemoPOJO> который, в свою очередь, является обернутым CompletableFuture, внутри которого есть вычисление (чтобы взять тело в запросе и поместите его в свой dto)
Как только это вычисление будет выполнено, Mono перейдет в состояние success и вызовет то, что последует в цепочке, поэтому doOnSuccess сработает. Когда doOnSuccess будет сделано, он вернется Mono<Void>.
Вот где ваша проблема, когда doOnSuccess закончилось, вы subscribe. Итак, что вы здесь делаете, так это то, что как только кто-то опубликует ServerRequest в вашем приложении, Netty (сервер) настроит цепочку событий, и в этой цепочке событий приложение подпишется на себя.
Здесь цепочка завершается подпиской приложения на себя. Таким образом, приложение является собственным клиентом.
Пример 2:
public Mono<ServerResponse> add(ServerRequest request) {
logger.debug("DemoPOJOHandler.add( ServerRequest )");
request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> System.out.println("DEMO -> " + demoPOJO.toString()));
return ServerResponse.ok().build();
}
Здесь мы делаем то же самое, что и в примере 3, просто когда цепочка событий настроена, запрос отображается на DTO, затем мы делаем что-то в doOnSuccess, но тогда цепочка разрывается. doOnSuccess сигнализирует о том, что сделано, но дальше ничего не слушает.
Так что здесь цепочка событий разорвана, она неполная. Ничего не происходит, пока вы не подпишитесь, но так как цепочка разорвана, никто не может subscribe следовательно ничего не произойдет.
Пример 1:
public Mono<ServerResponse> add(ServerRequest request) {
logger.debug("DemoPOJOHandler.add( ServerRequest )");
return request.bodyToMono(DemoPOJO.class).doOnSuccess(demoPOJO -> service.add(demoPOJO))
.then(ServerResponse.ok().build())
.switchIfEmpty(ServerResponse.badRequest()
.contentType(MediaType.APPLICATION_JSON)
.build());
}
Здесь цепочка завершена. Что-то сигнализирует о чем-то, это сигнализирует о чем-то, когда что-то завершается, срабатывает следующее, и следующее, и следующее, и следующее.
Вызывающий клиент отправляет данные, сервер настраивает цепочку событий, цепочка завершается, так что клиент подписывается, клиент подписывается, а затем запускается цепочка событий и запускает все обратные вызовы и возвращает данные.
Flux<T> и Mono<T> являются классами-оболочками вокруг монады CompletableFuture<T>. Optional<T> и Stream<T> также являются монадами, а монады пришли из функционального мира, такого как язык программирования Haskell. Хороший способ понять, как они работают, — узнать больше о монадах.
Если вы хотите в целом понять больше о монадах, я собираюсь без зазрения совести опубликовать свою собственную статью об этом:
Написать монаду на Java, серьезно?
Я хорошо читал также Введение в реактивное программирование, я рекомендую просмотреть все их примеры.
Не беспокойтесь, это тяжело. Я начал изучать функциональный язык Haskell пару лет назад, поначалу это было сложно, но со временем становится все легче и легче.
Спасибо, что нашли время ответить на мои вопросы. Честно говоря, я считаю себя довольно приличным программистом, но никогда не был поклонником использования "чужого кода"... только потому, что это требует понимания того, как его предполагается использовать. С Webflux и Reactor, однако... ну, это "другая история", и я испытываю крутую кривую обучения.