Я пытаюсь получать сообщения от kafka и отправлять их в RSocket с помощью Spring. Публикация на стороне сервера на Spring Java и на стороне клиента с помощью React
@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {
@Bean
public Function<Integer, Mono<Integer>> rsocketConsumer(RSocketRequester.Builder builder,
RsocketConsumerProperties rsocketConsumerProperties) {
RSocketRequester rSocketRequester = builder.websocket(URI.create("ws://localhost:7000/"));
return input -> rSocketRequester.route(rsocketConsumerProperties.getRoute()).data(input).retrieveMono(Integer.class);
}
}
@EnableBinding(Sink.class)
public class Listener {
@Autowired
private Function<Integer, Mono<Integer>> rsocketConsumer;
@StreamListener(Sink.INPUT)
public void fireAndForget(Integer val) {
System.out.println(val);
rsocketConsumer.apply(val).subscribe();
}
}
@Controller
public class ServerController {
@MessageMapping("data")
public Mono<Integer> hello(Integer integer) {
return Mono.just(integer);
}
}
Что я делаю неправильно на стороне сервера, потому что мой клиент подключен, но не может получать новые сообщения
client.connect().subscribe({
onComplete: socket => {
socket.fireAndForget({
data: { message: "hello from javascript!" },
metadata: null
});
},
onError: error => {
console.info("got error");
console.error(error);
},
onSubscribe: cancel => {
/* call cancel() to abort */
console.info("subscribe!");
console.info(cancel);
// cancel.cancel();
}
});





Вы делаете это requester.route("input").data("Welcome to Rsocket").send(); там, где у нас есть это:
/**
* Perform a {@link RSocket#fireAndForget fireAndForget} sending the
* provided data and metadata.
* @return a completion that indicates if the payload was sent
* successfully or not. Note, however that is a one-way send and there
* is no indication of whether or how the event was handled on the
* remote end.
*/
Mono<Void> send();
Видишь - Mono? Это означает, что на него нужно подписаться, чтобы инициировать обработку реактивного потока. Подробнее о проекте Reactor: https://projectreactor.io/
С другой стороны, непонятно, что такое сервер и что такое клиент в вашем случае... ты делаешь это:
/**
* Build an {@link RSocketRequester} with an
* {@link io.rsocket.core.RSocketClient} that connects over WebSocket to
* the given URL. The requester can be used to make requests
* concurrently. Requests are made over a shared connection that is also
* re-established as needed when further requests are made.
* @param uri the URL to connect to
* @return the created {@code RSocketRequester}
* @since 5.3
*/
RSocketRequester websocket(URI uri);
И я бы сказал, что это означает клиент в коде, который вы показываете. Сервер находится на другой стороне, где этот 7000 порт открыт для ws:// протокола. Итак, убедитесь, что вы понимаете и правильно настроили все части. Например, я не понимаю, зачем вам @RestController в вашем Listener классе...
Вы идете правильным путем с RSocketRequester. Только то, что вы упускаете, является subscribe() к этому send() результату. вы также можете ознакомиться с последней версией Spring Cloud Stream с функциональным API, а также изучить этот проект: spring.io/projects/spring-cloud-stream-applications. У него есть приложение RScoket Sink для таких задач, как ваша. Вместе с Kafka Binder вы достигнете своей цели без написания кода!
Привет еще раз. Я пытаюсь использовать этот. Похоже, то, что мне нужно. spring.io/blog/2020/08/03/…. Но я получаю сообщение об ошибке. Поле rsocketConsumer в com.healthservices.streaming.client.listener.Listener требует bean-компонента типа «org.springframework.cglib.core.internal.Function», который не может быть найден. Попытка ввести rsocketConsumer
Почему вы пишете собственное приложение? Что не так с этим RSocket Sink из коробки? В любом случае, я думаю, что вам не хватает этого spring.cloud.function.definition=rsocketConsumer в application.properties для вашей пользовательской раковины.
@meuhedetmeuhedet, это полезный ответ. Вам следует подумать о том, чтобы принять его (используя значок V и обязательно проголосовать за него).
я вижу, что он срабатывает с помощью rsocketConsumer.apply(val).subscribe(); Проверяем контроллером. Но не вижу этого в приложении реакции
Вероятно, это отдельная история, которая заслуживает отдельной ветки SO...
Привет Артем. Спасибо за ответ. Я исправил restcontroller, так как он был неправильным. Я пытаюсь получить сообщения от kafka, а затем отправить их в rsocket. Что бы вы посоветовали, как это сделать? Я хочу использовать spring для чтения сообщений от kafka и отправки их для реагирования (javascript)