Резюме разрешения:
В большинстве примеров RSocket, которые в настоящее время существуют, акцептор на стороне сервера просто создается как новый объект (например, new MqttMessageService () ниже) даже в руководствах, связанных с SpringBoot. Это нормально, если вы генерируете пример содержимого прямо в классе приемника, но может привести к путанице, связанной с инъекцией зависимостей ниже, когда акцептор зависит от других bean-компонентов в контейнере.
Исходный вопрос:
Я получаю исключение NullPointerException при попытке потоковой передачи записей базы данных с использованием репозитория Spring Data Reactive Mongodb через сервер Java Rsocket.
Проблема в том, что во время отладки все компоненты работают отдельно: я могу получить запрошенные данные через тот же репозиторий Mongodb, а также могу передавать произвольно сгенерированные данные между тем же сервером и клиентом с помощью Rsocket.
Так что мне либо не хватает чего-то действительно простого, либо может возникнуть проблема с совместным использованием Reactive Mongodb и Rsocket.
Вот оригинальный конфигурация Rsocket на стороне сервера:
@Configuration
public class RsocketConfig {
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
А вот рабочий конфигурация Rsocket на стороне сервера с правильным DI:
@Configuration
public class RsocketConfig {
@Autowired
MqttMessageService messageService;
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(messageService))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
Вот реализация на стороне сервера AbstractRSocket, где исключение NullPointerException выбрасывается при возврате service.findAll ().
@Service
public class MqttMessageService extends AbstractRSocket {
@Autowired
private MqttMessageEntityService service;
@Override
public Flux<Payload> requestStream(Payload payload) {
return service.findAll()
.map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));
}
}
Вот реактивный репозиторий и связанный с ним сервис. Служба возвращает null при внедрении в реализацию AbstractRSocket сервера, но отлично работает при внедрении в другие классы:
@Service
public class MqttMessageEntityService {
@Autowired
private MqttMessageEntityRepository repository;
public Flux<MqttMessageEntity> findAll() {
return repository.findAll();
}
}
public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {
}
А вот код сторона клиента, который отлично работает с содержимым теста:
@Configuration
public class RsocketConfig {
@PostConstruct
public void testRsocket() {
RSocket rSocketClient = RSocketFactory
.connect()
.transport(TcpClientTransport.create(8802))
.start()
.block();
rSocketClient
.requestStream(DefaultPayload.create(""))
.blockLast();
}
}
Я могу быть немного выше моего уровня знаний здесь, и ресурсы по этой теме очень ограничены, поэтому я ценю любые подсказки к решению :)




Касательно
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose();
}
Вы используете для поддержания сервера в рабочем состоянии? В таком случае добавьте еще один блок после onClose ().
Значение messageEntityService равно нулю? Потому что, похоже, это единственное, что могло бы вызвать ошибку, если бы переменные topicStart и module не совпадали. Особенно, если работает другой код - я не вижу ничего, что могло бы вызвать проблемы со стороны RSocket.
Я думаю, этот поток может помочь вам понять, почему Spring вводит нуль в вашу переменную @Autowired: stackoverflow.com/questions/19896870/…
Хаха спасибо! Первое, что я проверил, это то, что служба управляется контейнером, но пропустил ту часть, где я должен был ввести ее в качестве акцептора на стороне сервера, вместо того, чтобы создавать новый экземпляр вне контейнера Spring. Я был немного сбит с толку, так как все примеры, которые я видел, даже те, которые связаны с SpringBoot, использовали этот способ добавления акцептора, но теперь я понял, что в этих случаях не имеет значения, поддерживается ли он Spring, поскольку все они использовали данные, сгенерированные внутри акцептора, и не зависели от других служб (например, db в моем случае)
Ах да, вы правы! MessageEntityService не возвращает нулевое значение, сама служба имеет нулевое значение. Это неожиданно, так как она отлично работает при внедрении в другой класс. Ты хоть представляешь, как это могло происходить? Я обновляю свой вопрос, добавляя более подробную информацию о реактивном репозитории mongo и messageEntityService, поскольку он все еще может быть актуален в этом вопросе.