В нашем приложении мы используем Spring Integration 5.1.4 и spring-boot-starter-integration 2.1.4. Мы используем конфигурацию XML для удобства просмотра графика интеграции. Теперь нам нужно прочитать сообщения из темы kafka, поэтому мы хотим использовать последнюю версию spring-integration-kafka 3.1.2.RELEASE и адаптер входящего канала kafka. Я мог найти примеры конфигураций xml, используя версии spring-integration-kafka 1.x, но не смог найти конфигурацию xml для последних версий? Если я использую более старую конфигурацию xml с версией 3.x, она выдает ошибку «невозможно найти объявление для элемента int-kafka: zookeeper-connect». Может ли кто-нибудь помочь нам указать нам, что не так с матрицей совместимости версий, или предоставить некоторые пример конфигурации xml для адаптера входящего канала 3.1.2 kafka для чтения из темы kafka.
<int-kafka:zookeeper-connect
id = "zookeeperConnect" zk-connect = "localhost:2181"
zk-connection-timeout = "6000" zk-session-timeout = "6000"
zk-sync-time = "2000" />
<int-kafka:inbound-channel-adapter
id = "kafkaInboundChannelAdapter"
kafka-consumer-context-ref = "consumerContext" auto-startup = "true"
channel = "inputFromKafka">
<int:poller fixed-delay = "2000" time-unit = "MILLISECONDS" />
</int-kafka:inbound-channel-adapter>
<bean id = "consumerProperties"
class = "org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name = "properties">
<props>
<prop key = "auto.offset.reset">smallest</prop>
<prop key = "socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
<prop key = "fetch.message.max.bytes">5242880</prop>
<prop key = "auto.commit.interval.ms">1000</prop>
</props>
</property>
</bean>
<int-kafka:consumer-context
id = "consumerContext" consumer-timeout = "1000"
zookeeper-connect = "zookeeperConnect"
consumer-properties = "consumerProperties">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id = "Group1" max-messages = "5000"
key-decoder = "deccoder" value-decoder = "deccoder">
<int-kafka:topic id = "Helloworld-Topic" streams = "3" />
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<bean id = "deccoder"
class = "org.springframework.integration.kafka.serializer.common.StringDecoder" />
См. документация (глава в справочнике Spring для Apache Kafka).
<int-kafka:message-driven-channel-adapter
id = "kafkaListener"
listener-container = "container1"
auto-startup = "false"
phase = "100"
send-timeout = "5000"
mode = "record"
retry-template = "template"
recovery-callback = "callback"
error-message-strategy = "ems"
channel = "someChannel"
error-channel = "errorChannel" />
<bean id = "container1" class = "org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class = "org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key = "bootstrap.servers" value = "localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class = "org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name = "topics" value = "foo" />
</bean>
</constructor-arg>
</bean>
Не задавайте новые вопросы в комментариях к старым ответам; всегда задавайте новый вопрос и показывайте свой код/конфигурацию. Какую версию ты используешь? Это было добавлено в версии 3.2.
Мне просто было интересно, откуда оно взялось. Я пытался понять этот вопрос и не нашел его в 3.1.2. в любом случае спасибо за ваш ответ.
Как я уже сказал, он не был добавлен до версии 3.2.
У меня проблема с тем, что адаптер канала, управляемый сообщениями, недоступен в весенней интеграции kafka xsd. Моя IDE всегда жалуется на отсутствие определения. Так же при запуске приложение вылетает с ошибкой пока не может найти определение. Я что-то пропустил?