Потоки Kafka с Spring Boot

Я хочу работать с обработкой в ​​реальном времени Kafka Streams в моем проекте весенней загрузки. Итак, мне нужна конфигурация Kafka Streams, или я хочу использовать KStreams или KTable, но я не смог найти пример в Интернете.

Я назначил производителя и потребителя, теперь я хочу транслировать в реальном времени.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
16
0
31 855
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

вы можете создать новый проект весенней загрузки с нуля, используя https://start.spring.io/ соответственно выберите необходимые версии / зависимости и сгенерируйте / загрузите проект.

вы можете приступить к реализации методов kstream api (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KStream.html)

Ответ принят как подходящий

Позвольте мне начать с того, что если вы новичок в потоках Kafka, добавление Spring-boot поверх него добавляет еще один уровень сложности, а потоки Kafka требуют большого обучения как есть. Вот основы, которые помогут вам начать: пом:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.1.10.RELEASE</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>${kafka.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.12</artifactId>
  <version>${kafka.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>${kafka.version}</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>connect-api</artifactId>
  <version>${kafka.version}</version>
</dependency>

Теперь объект конфигурации. В приведенном ниже коде предполагается, что вы создаете два потоковых приложения, и имейте в виду, что каждое приложение представляет свою собственную топологию обработки:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaStreamConfig {

  @Value("${delivery-stats.stream.threads:1}")
  private int threads;

  @Value("${delivery-stats.kafka.replication-factor:1}")
  private int replicationFactor;

  @Value("${messaging.kafka-dp.brokers.url:localhost:9092}")
  private String brokersUrl;


  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  public StreamsConfig kStreamsConfigs() {
    Map<String, Object> config = new HashMap<>();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
    setDefaults(config);
    return new StreamsConfig(config);
  }


  public void setDefaults(Map<String, Object> config) {
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
  }

  @Bean("app1StreamBuilder")
  public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() {
    Map<String, Object> config = new HashMap<>();
    setDefaults(config);
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
    config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
    config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    return new StreamsBuilderFactoryBean(config);
  }

  @Bean("app2StreamBuilder")
  public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() {
    Map<String, Object> config = new HashMap<>();
    setDefaults(config);
    config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2");
    config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
    config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
    config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
    return new StreamsBuilderFactoryBean(config);
  }
}

Теперь самое интересное - использовать streamsBuilder для создания вашего приложения (app1 в этом примере).

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class App1 {
  @SuppressWarnings("unchecked")
  @Bean("app1StreamTopology")
  public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) {

    final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long()));
    toSquare.map((key, value) -> { // do something with each msg, square the values in our case
      return KeyValue.pair(key, value * value);
    }).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic

    return toSquare;
  }
}

Надеюсь это поможет.

Команды Kafka для создания темы и отправки данных в тему

Создать тему: kafka-themes.bat --zookeeper localhost: 2181 --create --topic toSquare --replication-factor 1 --partitions 1

Отправить данные в тему: kafka-console-maker --broker-list localhost: 9092 --topic testStreamsIn --property parse.key = true --property key.separator =, тест, 12345678

Вы самые лучшие.

Alpcan Yıldız 25.10.2018 00:26

Могу я спросить, зачем вам возвращать KStream, который не используется в приложении? Почему вы не можете использовать это в аннотации постконструкций?

hudi 10.09.2019 09:41

Вы определяете bean-компонент app1StreamTopology, но как этот bean-компонент подключается при запуске приложения. Я не вижу, как его внедряют куда-либо, поэтому Spring Kafka собирает все компоненты типа KStream, а затем применяет потоковый вход?

Dhiren Dash 05.07.2021 22:14

Другой способ инициализировать приложение Kafka Streams в Spring Boot можно найти по адресу

https://gist.github.com/itzg/e3ebfd7aec220bf0522e23a65b1296c8

Этот подход использует bean-компонент KafkaStreams, вызывающий kafkaStreams.start (), который может быть загружен как Bean-компонент Topology, так и StreamBuilder.

Ссылка на решение приветствуется, но убедитесь, что ваш ответ полезен и без нее: добавить контекст вокруг ссылки, чтобы ваши друзья-пользователи имели некоторое представление о том, что это такое и почему оно есть, а затем процитируйте наиболее релевантную часть страницы, на которую вы ссылаетесь. если целевая страница недоступна. Ответы, которые представляют собой не более чем ссылку, могут быть удалены.

Alessio 20.09.2019 08:48

Простой способ начать работу с Kafka Streams при загрузке Spring:

  1. Загрузите свой проект с помощью https://start.spring.io. Выберите Облачный поток и Spring для Apache Kafka Streams как зависимости. Вот ссылка на предварительно настроенный шаблон проекта: https://start.spring.io/#!language=java&dependencies=kafka-streams,cloud-stream

  2. Определите bean-компонент KStream в своем приложении. Например, это очень простое потребительское приложение. Он просто потребляет данные и записывает записи из KStream в стандартный вывод.

    @SpringBootApplication
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Main.class, args);
        }
    
        @Bean
        public java.util.function.Consumer<KStream<String, String>> process() {
            return stream -> stream.foreach((key, value) -> {
                System.out.println(key + ":" + value);
            });
        }
    }
    

    В этом приложении мы определили одну привязку ввода. Spring создаст эту привязку с именем process-in-0, то есть именем функции bean-компонента, за которым следует -in-, за которым следует порядковый номер параметра. Это имя привязки используется для установки других свойств, таких как имя темы. Например, spring.cloud.stream.bindings.process-in-0.destination=my-topic.

    См. Дополнительные примеры здесь - Spring Cloud Stream Kafka Binder Reference, раздел Programming Model.

  3. Настройте application.yaml следующим образом:

    spring:
      cloud:
        stream:
          bindings:
            process-in-0.destination: my-topic
          kafka:
            streams:
              binder:
                applicationId: my-app
                brokers: localhost:9092
                configuration:
                  default:
                    key:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                    value:
                      serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    
process-in - это name of the method + input, поэтому он становится process-in аналогично, как только вы закончите обработку дампа данных в выходной класс, будет process-out
silentsudo 11.04.2020 15:51

Другие вопросы по теме