У меня есть абсолютно простой проект SpringBoot с простой конфигурацией и простым интеграционным тестом для тестирования WebSockets.
pom.xml
<?xml version = "1.0" encoding = "UTF-8"?>
<project xmlns = "http://maven.apache.org/POM/4.0.0"
xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation = "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sandbox.websocket</groupId>
<artifactId>websocket</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<java.version>1.8</java.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
SpringBootApplication:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
Конфигурация брокера сообщений
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/greeting")
.withSockJS();
}
}
Тест интеграции, чтобы просто подключиться к серверу, подписаться на брокера сообщений и отправить сообщение.
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class WebSocketIntegrationTest {
@LocalServerPort
private int localServerPort;
private BlockingQueue<String> blockingQueue;
@Before
public void setup() {
blockingQueue = new LinkedBlockingDeque<>();
}
@Test
public void shouldReceiveAMessageFromTheServer() throws Exception {
String uri = "ws://localhost:" + localServerPort + "/greeting";
WebSocketStompClient stompClient = new WebSocketStompClient(
new SockJsClient(Collections.singletonList(
new WebSocketTransport(
new StandardWebSocketClient()))));
String message = "MESSAGE TEST";
ListenableFuture<StompSession> connect = stompClient.connect(uri, new StompSessionHandlerAdapter() {});
StompSession session = connect.get(1, SECONDS);
session.subscribe("/topic", new DefaultStompFrameHandler());
session.send("/topic", message.getBytes());
Assert.assertEquals(message, blockingQueue.poll(10, SECONDS));
}
class DefaultStompFrameHandler implements StompFrameHandler {
@Override
public Type getPayloadType(StompHeaders stompHeaders) {
return byte[].class;
}
@Override
public void handleFrame(StompHeaders stompHeaders, Object o) {
System.out.println("============================================================ = ");
System.out.println(new String((byte[]) o));
System.out.println("============================================================ = ");
blockingQueue.offer(new String((byte[]) o));
}
}
}
Если я запустил его и протестирую из клиента javascript, он будет работать как шарм. Если я запускаю интеграционный тест, он работает только иногда. Проблема в том, что иногда метод DefaultStompFrameHandler.handleFrame () не вызывается, поэтому в очередь ничего не сохраняется, и утверждение не выполняется.
Я написал перехватчик InboundChannel для перехвата кадров и вывода команд на консоль, и все четыре команды всегда печатаются (CONNECT, SUBSCRIBE, SEND, DISCONNECT).
Итак, все команды отправляются на сервер, включая SEND, но иногда (60-70%) StompFrameHandler не используется независимо от того, как долго установлен тайм-аут queue.poll.
Любая помощь, пожалуйста?
Установка уровня ведения журнала на DEBUG показала, что сервер получил событие подписки после публикации сообщения. Это вопрос о расовых условиях.
Глядя на реализацию DefaultStompSession, мы видим, что она не обрабатывает подтверждения, что очень затрудняет гарантию того, что подписка произошла.




ты нашел решение? У меня такая же проблема!