Как установить PartitionKey при использовании Spring Cloud Stream Kinesis

В своем коде я использовал setHeader.

mysource.getChannel1()
        .send(MessageBuilder
        .withPayload(new Person("messageA", 1))
        .setHeader("partitionKey", 345).build());

В файле свойств я добавил:

spring.cloud.stream.bindings.channel1.producer.partitionKeyExpression = 
                                                      headers['partitionKey']

Но все же PartitionKey - это не 345, а partitionKey - это какое-то хеш-значение 2133325211. Даже если я вставляю 2 сообщения с одинаковым заголовком partitionKey, в Kinesis мы получаем 2 разных ключа разделов.

Когда я пытаюсь

spring.cloud.stream.bindings.output.producer.partitionKeyExpression = payload.id

partitionKey всегда равен partitionKey-0

Мой вопрос:

Как мне установить ключ раздела на определенное значение?

Почему ты не принимаешь правильный ответ

Moose on the Loose 06.05.2019 20:27
3
1
1 198
1

Ответы 1

Проблема в том, что текущая реализация полагается на встроенный алгоритм в SCSt для BinderHeaders.PARTITION_HEADER, который создает partition number, что не соответствует природе Kinesis, как мы должны выбирать конкретный сегмент. Ну, вообще-то мы его вообще не выбираем. Мы предоставляем некоторое значение ключа раздела, чтобы сообщение могло располагаться в том же сегменте по хешу из этого значения. Или мы можем предоставить явный хеш, чтобы быть уверенным, что мы переходим к одному и тому же осколку. По сути, это то же самое в конце концов - мы получаем осколок по хешу.

Чтобы он работал для вашего варианта использования payload.id, я предлагаю взглянуть на подход заголовка BinderHeaders.PARTITION_OVERRIDE:

@Bean
@GlobalChannelInterceptor(order = Integer.MIN_VALUE, patterns = Source.OUTPUT)
public ChannelInterceptor partitionOverrideInterceptor(BindingProperties bindingProperties,
        StandardEvaluationContext evaluationContext) {

    return new ChannelInterceptorAdapter() {

        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            return MessageBuilder.fromMessage(message)
                    .setHeader(BinderHeaders.PARTITION_HEADER,
                            bindingProperties.getProducer()
                                    .getPartitionKeyExpression()
                                    .getValue(evaluationContext, message))
                    .build();
        }

    };
}

Таким образом, заголовок scst_partition будет иметь точное значение, которое вы хотели бы передать через ваш partitionKeyExpression, а KinesisMessageHandler будет иметь правильное значение для целевого хеширования в PutRecordRequest.

См. https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/issues/52 для получения дополнительной информации.

Другие вопросы по теме