Получить данные из kafka и отправить их в RSocket

Я пытаюсь получать сообщения от kafka и отправлять их в RSocket с помощью Spring. Публикация на стороне сервера на Spring Java и на стороне клиента с помощью React

@Configuration
@EnableConfigurationProperties(RsocketConsumerProperties.class)
public class RsocketConsumerConfiguration {

    @Bean
    public Function<Integer, Mono<Integer>> rsocketConsumer(RSocketRequester.Builder builder,
                                                            RsocketConsumerProperties rsocketConsumerProperties) {
        RSocketRequester rSocketRequester = builder.websocket(URI.create("ws://localhost:7000/"));
        return input -> rSocketRequester.route(rsocketConsumerProperties.getRoute()).data(input).retrieveMono(Integer.class);
    }
}
@EnableBinding(Sink.class)
public class Listener {

    @Autowired
    private Function<Integer, Mono<Integer>> rsocketConsumer;


    @StreamListener(Sink.INPUT)
    public void fireAndForget(Integer val) {
        System.out.println(val);
        rsocketConsumer.apply(val).subscribe();
    }
}
@Controller
public class ServerController {

    @MessageMapping("data")
    public Mono<Integer> hello(Integer integer) {
        return Mono.just(integer);
    }

}

Что я делаю неправильно на стороне сервера, потому что мой клиент подключен, но не может получать новые сообщения

  client.connect().subscribe({
    onComplete: socket => {
        socket.fireAndForget({
          data: { message: "hello from javascript!" },
          metadata: null
        });
      },
      onError: error => {
        console.info("got error");
        console.error(error);
      },
      onSubscribe: cancel => {
        /* call cancel() to abort */
        console.info("subscribe!");
        console.info(cancel);
        // cancel.cancel();
      }
    });
   
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Навигация по приложениям React: Исчерпывающее руководство по React Router
Навигация по приложениям React: Исчерпывающее руководство по React Router
React Router стала незаменимой библиотекой для создания одностраничных приложений с навигацией в React. В этой статье блога мы подробно рассмотрим...
Массив зависимостей в React
Массив зависимостей в React
Все о массиве Dependency и его связи с useEffect.
2
0
758
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы делаете это requester.route("input").data("Welcome to Rsocket").send(); там, где у нас есть это:

   /**
     * Perform a {@link RSocket#fireAndForget fireAndForget} sending the
     * provided data and metadata.
     * @return a completion that indicates if the payload was sent
     * successfully or not. Note, however that is a one-way send and there
     * is no indication of whether or how the event was handled on the
     * remote end.
     */
    Mono<Void> send();

Видишь - Mono? Это означает, что на него нужно подписаться, чтобы инициировать обработку реактивного потока. Подробнее о проекте Reactor: https://projectreactor.io/

С другой стороны, непонятно, что такое сервер и что такое клиент в вашем случае... ты делаешь это:

    /**
     * Build an {@link RSocketRequester} with an
     * {@link io.rsocket.core.RSocketClient} that connects over WebSocket to
     * the given URL. The requester can be used to make requests
     * concurrently. Requests are made over a shared connection that is also
     * re-established as needed when further requests are made.
     * @param uri the URL to connect to
     * @return the created {@code RSocketRequester}
     * @since 5.3
     */
    RSocketRequester websocket(URI uri);

И я бы сказал, что это означает клиент в коде, который вы показываете. Сервер находится на другой стороне, где этот 7000 порт открыт для ws:// протокола. Итак, убедитесь, что вы понимаете и правильно настроили все части. Например, я не понимаю, зачем вам @RestController в вашем Listener классе...

Привет Артем. Спасибо за ответ. Я исправил restcontroller, так как он был неправильным. Я пытаюсь получить сообщения от kafka, а затем отправить их в rsocket. Что бы вы посоветовали, как это сделать? Я хочу использовать spring для чтения сообщений от kafka и отправки их для реагирования (javascript)

meuhedet meuhedet 23.12.2020 15:58

Вы идете правильным путем с RSocketRequester. Только то, что вы упускаете, является subscribe() к этому send() результату. вы также можете ознакомиться с последней версией Spring Cloud Stream с функциональным API, а также изучить этот проект: spring.io/projects/spring-cloud-stream-applications. У него есть приложение RScoket Sink для таких задач, как ваша. Вместе с Kafka Binder вы достигнете своей цели без написания кода!

Artem Bilan 23.12.2020 16:03

Привет еще раз. Я пытаюсь использовать этот. Похоже, то, что мне нужно. spring.io/blog/2020/08/03/…. Но я получаю сообщение об ошибке. Поле rsocketConsumer в com.healthservices.streaming.client.listener.Listener требует bean-компонента типа «org.springframework.cglib.core.internal.Function», который не может быть найден. Попытка ввести rsocketConsumer

meuhedet meuhedet 23.12.2020 16:26

Почему вы пишете собственное приложение? Что не так с этим RSocket Sink из коробки? В любом случае, я думаю, что вам не хватает этого spring.cloud.function.definition=rsocketConsumer в application.properties для вашей пользовательской раковины.

Artem Bilan 23.12.2020 16:50

@meuhedetmeuhedet, это полезный ответ. Вам следует подумать о том, чтобы принять его (используя значок V и обязательно проголосовать за него).

royB 23.12.2020 20:02

я вижу, что он срабатывает с помощью rsocketConsumer.apply(val).subscribe(); Проверяем контроллером. Но не вижу этого в приложении реакции

meuhedet meuhedet 24.12.2020 10:32

Вероятно, это отдельная история, которая заслуживает отдельной ветки SO...

Artem Bilan 24.12.2020 19:34
stackoverflow.com/questions/65463438/…
meuhedet meuhedet 27.12.2020 07:38

Другие вопросы по теме