Есть ли какая-либо конфигурация xml, доступная для адаптера входящего канала в весенней интеграции kafka версии 3.x

В нашем приложении мы используем Spring Integration 5.1.4 и spring-boot-starter-integration 2.1.4. Мы используем конфигурацию XML для удобства просмотра графика интеграции. Теперь нам нужно прочитать сообщения из темы kafka, поэтому мы хотим использовать последнюю версию spring-integration-kafka 3.1.2.RELEASE и адаптер входящего канала kafka. Я мог найти примеры конфигураций xml, используя версии spring-integration-kafka 1.x, но не смог найти конфигурацию xml для последних версий? Если я использую более старую конфигурацию xml с версией 3.x, она выдает ошибку «невозможно найти объявление для элемента int-kafka: zookeeper-connect». Может ли кто-нибудь помочь нам указать нам, что не так с матрицей совместимости версий, или предоставить некоторые пример конфигурации xml для адаптера входящего канала 3.1.2 kafka для чтения из темы kafka.

<int-kafka:zookeeper-connect
    id = "zookeeperConnect" zk-connect = "localhost:2181"
    zk-connection-timeout = "6000" zk-session-timeout = "6000"
    zk-sync-time = "2000" />

<int-kafka:inbound-channel-adapter
    id = "kafkaInboundChannelAdapter"
    kafka-consumer-context-ref = "consumerContext" auto-startup = "true"
    channel = "inputFromKafka">
    <int:poller fixed-delay = "2000" time-unit = "MILLISECONDS" />
</int-kafka:inbound-channel-adapter>

<bean id = "consumerProperties"
    class = "org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name = "properties">
        <props>
            <prop key = "auto.offset.reset">smallest</prop>
            <prop key = "socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
            <prop key = "fetch.message.max.bytes">5242880</prop>
            <prop key = "auto.commit.interval.ms">1000</prop>
        </props>
    </property>
</bean>

<int-kafka:consumer-context
    id = "consumerContext" consumer-timeout = "1000"
    zookeeper-connect = "zookeeperConnect"
    consumer-properties = "consumerProperties">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
            group-id = "Group1" max-messages = "5000"
            key-decoder = "deccoder" value-decoder = "deccoder">
            <int-kafka:topic id = "Helloworld-Topic" streams = "3" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<bean id = "deccoder"
    class = "org.springframework.integration.kafka.serializer.common.StringDecoder" />
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Версия Java на основе версии загрузки
Версия Java на основе версии загрузки
Если вы зайдете на официальный сайт Spring Boot , там представлен start.spring.io , который упрощает создание проектов Spring Boot, как показано ниже.
Документирование API с помощью Swagger на Springboot
Документирование API с помощью Swagger на Springboot
В предыдущей статье мы уже узнали, как создать Rest API с помощью Springboot и MySql .
0
0
345
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

См. документация (глава в справочнике Spring для Apache Kafka).

<int-kafka:message-driven-channel-adapter
        id = "kafkaListener"
        listener-container = "container1"
        auto-startup = "false"
        phase = "100"
        send-timeout = "5000"
        mode = "record"
        retry-template = "template"
        recovery-callback = "callback"
        error-message-strategy = "ems"
        channel = "someChannel"
        error-channel = "errorChannel" />

<bean id = "container1" class = "org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class = "org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key = "bootstrap.servers" value = "localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class = "org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name = "topics" value = "foo" />
        </bean>
    </constructor-arg>

</bean>

У меня проблема с тем, что адаптер канала, управляемый сообщениями, недоступен в весенней интеграции kafka xsd. Моя IDE всегда жалуется на отсутствие определения. Так же при запуске приложение вылетает с ошибкой пока не может найти определение. Я что-то пропустил?

Manuel 19.11.2019 14:29

Не задавайте новые вопросы в комментариях к старым ответам; всегда задавайте новый вопрос и показывайте свой код/конфигурацию. Какую версию ты используешь? Это было добавлено в версии 3.2.

Gary Russell 19.11.2019 14:50

Мне просто было интересно, откуда оно взялось. Я пытался понять этот вопрос и не нашел его в 3.1.2. в любом случае спасибо за ваш ответ.

Manuel 20.11.2019 21:34

Как я уже сказал, он не был добавлен до версии 3.2.

Gary Russell 20.11.2019 22:29

Другие вопросы по теме