Я создал простое демонстрационное приложение, чтобы познакомиться с новым WebSocketClient. Я нашел руководство по адресу: https://stackify.com/reactive-spring-5/ (Реактивные клиенты WebSocket):
@Bean
CommandLineRunner demo() {
return args -> {
Flux<String> input = Flux.<String>generate(sink -> sink.next("This is a test"))
.delayElements(Duration.ofSeconds(1));
WebSocketClient client = new ReactorNettyWebSocketClient();
client.execute(new URI("ws://echo.websocket.org"), session ->
session.send(input.map(session::textMessage))
.thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).log())
.then())
.subscribe();
};
}
Мое приложение не вылетает, но я вообще не вижу журналов, я что-то делаю не так? Я ожидаю получить то же значение, возвращаемое службой echo websocket.
Если я заменю входную переменную следующим образом, она будет работать:
Mono<String> input = Mono.just("This is a message");
В чем разница и как я могу заставить его работать с потоком String?
Это работает для меня:
package codependent;
import org.junit.Test;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
public class SomeTest {
@Test
public void myTest() throws URISyntaxException, InterruptedException {
CountDownLatch latch = new CountDownLatch(20);
EmitterProcessor<Object> output = EmitterProcessor.create();
Flux<String> input = Flux.<String>generate(sink -> sink.next("This is a test"))
.delayElements(Duration.ofSeconds(1));
WebSocketClient client = new ReactorNettyWebSocketClient();
Mono<Void> sessionMono = client.execute(new URI("ws://echo.websocket.org"), session ->
session.send(input.map(session::textMessage))
.thenMany(session.receive()
.map(WebSocketMessage::getPayloadAsText).log()
.subscribeWith(output).then()).then());
output.doOnSubscribe(s -> sessionMono.subscribe())
.subscribe(i -> {
System.out.println("Received " + i);
latch.countDown();
});
latch.await();
}
}
Бревно:
18:14:10.205 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0x5f974732, L:/192.168.2.193:60438 - R:echo.websocket.org/174.129.224.73:80] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 81 0e 54 68 69 73 20 69 73 20 61 20 74 65 73 74 |..This is a test|
+--------+-------------------------------------------------+----------------+
18:14:10.205 [reactor-http-nio-4] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder - Decoding WebSocket Frame opCode=1
18:14:10.205 [reactor-http-nio-4] DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder - Decoding WebSocket Frame length=14
18:14:10.205 [reactor-http-nio-4] INFO reactor.Flux.Map.1 - onNext(This is a test)
Received This is a test
Я не могу понять, что это делает
output.doOnSubscribe(s -> sessionMono.subscribe()) .subscribe(i -> { System.out.println("Received " + i); latch.countDown(); });.outputуже подписался на сообщенияsession.receive(). В чем заключается работа методаdoOnSubscribe.