Не удалось декодировать тип json в весеннем облачном потоке DefaultKafkaHeaderMapper

Мы используем spring-cloud-stream и планируем обновить нашу версию Kafka.
Наши приложения, использующие spring-cloud-stream:2.0.0 (spring-kafka 2.1.7) с сервером apache kafka 1.0.1 а также использовать spring-cloud-sleuth:2.0.0 для отслеживания.
Мы собираемся обновить наш сервер Kafka до версии 2.3.0, поэтому требуется обновление до spring-boot 2.2.x (Hoxton) с spring-cloud-sleuth:2.2.0 и spring-cloud-stream:3.0.3 (Horsham.SR3).
У нас есть около 200 приложений, которые используют Kafka, поэтому обновление будет происходить постепенно, поэтому в качестве промежуточного состояния у нас будут производители, использующие более новую версию, и потребители, использующие старую версию. Наши потребители используют @StreamListener.

Во время наших тестов мы столкнулись с проблемой парсинга большинства заголовков с типом String и получения следующего:

ERROR 27448 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : Could not decode json type: ecb89ccb3e79418b for key: X-B3-TraceId
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ecb89ccb3e79418b': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"ecb89ccb3e79418b"; line: 1, column: 33]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[jackson-databind-2.9.6.jar:2.9.6]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders$1(DefaultKafkaHeaderMapper.java:233) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_221]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.toHeaders(KafkaMessageChannelBinder.java:554) ~[spring-cloud-stream-binder-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
...

В то время как заголовок типов:

{spanTraceId=java.lang.String, spanId=java.lang.String, spanParentSpanId=java.lang.String, nativeHeaders=org.springframework.util.LinkedMultiValueMap, X-B3-SpanId=java.lang.String, X-B3-ParentSpanId=java.lang.String, scst_partition=java.lang.Integer, X-B3-Sampled=java.lang.String, X-B3-TraceId=java.lang.String, spanSampled=java.lang.String, contentType=java.lang.String}

Например, X-B3-SpanId, который был добавлен Sleuth, имеет тип String и значение: ecb89ccb3e79418b, которое не является строкой JSON, поэтому ObjectMapper терпит неудачу при преобразовании в String Object здесь:

headers.put(h.key(), getObjectMapper().readValue(h.value(), type))

Похоже, он не должен использовать ObjectMapper, когда у нас есть типы String, поэтому наши старые потребители терпят неудачу.

Есть ли способ предотвратить эту проблему при использовании нового производителя и старого потребителя?

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
4
0
1 976
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Вы можете настроить DefaultKafkaHeaderMapper для совместимости со старыми версиями:

    /**
     * Set to true to encode String-valued headers as JSON ("..."), by default just the
     * raw String value is converted to a byte array using the configured charset. Set to
     * true if a consumer of the outbound record is using Spring for Apache Kafka version
     * less than 2.3
     * @param encodeStrings true to encode (default false).
     * @since 2.3
     */
    public void setEncodeStrings(boolean encodeStrings) {
        this.encodeStrings = encodeStrings;
    }

Также см.

spring.cloud.stream.kafka.binder.headerMapperBeanName

Привет @gary, это предложение не работает. Мы попытались производить с использованием Hoxton.SR3 с установкой encodeStrings flag = true на DefaultKafkaHeaderMapper, как было предложено. При такой настройке потребитель может обрабатывать все заголовки String, но не может обрабатывать заголовок типа содержимого. Значение типа этого заголовка было изменено на org.springframework.util.MimeType (из-за изменения флага), а значение — «application/json», поэтому он пытается преобразовать его в JSON и терпит неудачу здесь: headers.put(h.key(), getObjectMapper().readValue(h.value(), type)); с NPE.

Yuval Simhon 20.12.2020 08:08

Попробуйте использовать BinderHeaderMapper с установленным свойством вместо DefaultKafkaHeaderMapper; изначально это был клон, но я вижу там дополнительный код для работы с MimeType.

Gary Russell 21.12.2020 15:38

Спасибо @gary - BinderHeaderMapper сделал это, никаких других проблем, связанных с заголовками, начинающимися с spring-kafka:2.3.7

Yuval Simhon 04.01.2021 10:16

Другие вопросы по теме