Я хочу работать с обработкой в реальном времени Kafka Streams в моем проекте весенней загрузки. Итак, мне нужна конфигурация Kafka Streams, или я хочу использовать KStreams или KTable, но я не смог найти пример в Интернете.
Я назначил производителя и потребителя, теперь я хочу транслировать в реальном времени.




вы можете создать новый проект весенней загрузки с нуля, используя https://start.spring.io/ соответственно выберите необходимые версии / зависимости и сгенерируйте / загрузите проект.
вы можете приступить к реализации методов kstream api (https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KStream.html)
Позвольте мне начать с того, что если вы новичок в потоках Kafka, добавление Spring-boot поверх него добавляет еще один уровень сложности, а потоки Kafka требуют большого обучения как есть. Вот основы, которые помогут вам начать: пом:
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.10.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
</dependency>
Теперь объект конфигурации. В приведенном ниже коде предполагается, что вы создаете два потоковых приложения, и имейте в виду, что каждое приложение представляет свою собственную топологию обработки:
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaStreamConfig {
@Value("${delivery-stats.stream.threads:1}")
private int threads;
@Value("${delivery-stats.kafka.replication-factor:1}")
private int replicationFactor;
@Value("${messaging.kafka-dp.brokers.url:localhost:9092}")
private String brokersUrl;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public StreamsConfig kStreamsConfigs() {
Map<String, Object> config = new HashMap<>();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "default");
setDefaults(config);
return new StreamsConfig(config);
}
public void setDefaults(Map<String, Object> config) {
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, FailOnInvalidTimestamp.class);
}
@Bean("app1StreamBuilder")
public StreamsBuilderFactoryBean app1StreamBuilderFactoryBean() {
Map<String, Object> config = new HashMap<>();
setDefaults(config);
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
return new StreamsBuilderFactoryBean(config);
}
@Bean("app2StreamBuilder")
public StreamsBuilderFactoryBean app2StreamBuilderFactoryBean() {
Map<String, Object> config = new HashMap<>();
setDefaults(config);
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app2");
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 30000);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, threads);
config.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, replicationFactor);
return new StreamsBuilderFactoryBean(config);
}
}
Теперь самое интересное - использовать streamsBuilder для создания вашего приложения (app1 в этом примере).
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class App1 {
@SuppressWarnings("unchecked")
@Bean("app1StreamTopology")
public KStream<String, Long> startProcessing(@Qualifier("app1StreamBuilder") StreamsBuilder builder) {
final KStream<String, Long> toSquare = builder.stream("toSquare", Consumed.with(Serdes.String(), Serdes.Long()));
toSquare.map((key, value) -> { // do something with each msg, square the values in our case
return KeyValue.pair(key, value * value);
}).to("squared", Produced.with(Serdes.String(), Serdes.Long())); // send downstream to another topic
return toSquare;
}
}
Надеюсь это поможет.
Команды Kafka для создания темы и отправки данных в тему
Создать тему: kafka-themes.bat --zookeeper localhost: 2181 --create --topic toSquare --replication-factor 1 --partitions 1
Отправить данные в тему: kafka-console-maker --broker-list localhost: 9092 --topic testStreamsIn --property parse.key = true --property key.separator =, тест, 12345678
Могу я спросить, зачем вам возвращать KStream, который не используется в приложении? Почему вы не можете использовать это в аннотации постконструкций?
Вы определяете bean-компонент app1StreamTopology, но как этот bean-компонент подключается при запуске приложения. Я не вижу, как его внедряют куда-либо, поэтому Spring Kafka собирает все компоненты типа KStream, а затем применяет потоковый вход?
Другой способ инициализировать приложение Kafka Streams в Spring Boot можно найти по адресу
https://gist.github.com/itzg/e3ebfd7aec220bf0522e23a65b1296c8
Этот подход использует bean-компонент KafkaStreams, вызывающий kafkaStreams.start (), который может быть загружен как Bean-компонент Topology, так и StreamBuilder.
Ссылка на решение приветствуется, но убедитесь, что ваш ответ полезен и без нее: добавить контекст вокруг ссылки, чтобы ваши друзья-пользователи имели некоторое представление о том, что это такое и почему оно есть, а затем процитируйте наиболее релевантную часть страницы, на которую вы ссылаетесь. если целевая страница недоступна. Ответы, которые представляют собой не более чем ссылку, могут быть удалены.
Простой способ начать работу с Kafka Streams при загрузке Spring:
Загрузите свой проект с помощью https://start.spring.io. Выберите Облачный поток и Spring для Apache Kafka Streams как зависимости. Вот ссылка на предварительно настроенный шаблон проекта: https://start.spring.io/#!language=java&dependencies=kafka-streams,cloud-stream
Определите bean-компонент KStream в своем приложении. Например, это очень простое потребительское приложение. Он просто потребляет данные и записывает записи из KStream в стандартный вывод.
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Main.class, args);
}
@Bean
public java.util.function.Consumer<KStream<String, String>> process() {
return stream -> stream.foreach((key, value) -> {
System.out.println(key + ":" + value);
});
}
}
В этом приложении мы определили одну привязку ввода. Spring создаст эту привязку с именем process-in-0, то есть именем функции bean-компонента, за которым следует -in-, за которым следует порядковый номер параметра. Это имя привязки используется для установки других свойств, таких как имя темы. Например, spring.cloud.stream.bindings.process-in-0.destination=my-topic.
См. Дополнительные примеры здесь - Spring Cloud Stream Kafka Binder Reference, раздел Programming Model.
Настройте application.yaml следующим образом:
spring:
cloud:
stream:
bindings:
process-in-0.destination: my-topic
kafka:
streams:
binder:
applicationId: my-app
brokers: localhost:9092
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
process-in - это name of the method + input, поэтому он становится process-in аналогично, как только вы закончите обработку дампа данных в выходной класс, будет process-out
Вы самые лучшие.