В своем коде я использовал setHeader.
mysource.getChannel1()
.send(MessageBuilder
.withPayload(new Person("messageA", 1))
.setHeader("partitionKey", 345).build());
В файле свойств я добавил:
spring.cloud.stream.bindings.channel1.producer.partitionKeyExpression =
headers['partitionKey']
Но все же PartitionKey - это не 345, а partitionKey - это какое-то хеш-значение 2133325211.
Даже если я вставляю 2 сообщения с одинаковым заголовком partitionKey, в Kinesis мы получаем 2 разных ключа разделов.
Когда я пытаюсь
spring.cloud.stream.bindings.output.producer.partitionKeyExpression = payload.id
partitionKey всегда равен partitionKey-0
Мой вопрос:
Как мне установить ключ раздела на определенное значение?
Проблема в том, что текущая реализация полагается на встроенный алгоритм в SCSt для BinderHeaders.PARTITION_HEADER, который создает partition number, что не соответствует природе Kinesis, как мы должны выбирать конкретный сегмент. Ну, вообще-то мы его вообще не выбираем. Мы предоставляем некоторое значение ключа раздела, чтобы сообщение могло располагаться в том же сегменте по хешу из этого значения. Или мы можем предоставить явный хеш, чтобы быть уверенным, что мы переходим к одному и тому же осколку. По сути, это то же самое в конце концов - мы получаем осколок по хешу.
Чтобы он работал для вашего варианта использования payload.id, я предлагаю взглянуть на подход заголовка BinderHeaders.PARTITION_OVERRIDE:
@Bean
@GlobalChannelInterceptor(order = Integer.MIN_VALUE, patterns = Source.OUTPUT)
public ChannelInterceptor partitionOverrideInterceptor(BindingProperties bindingProperties,
StandardEvaluationContext evaluationContext) {
return new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder.fromMessage(message)
.setHeader(BinderHeaders.PARTITION_HEADER,
bindingProperties.getProducer()
.getPartitionKeyExpression()
.getValue(evaluationContext, message))
.build();
}
};
}
Таким образом, заголовок scst_partition будет иметь точное значение, которое вы хотели бы передать через ваш partitionKeyExpression, а KinesisMessageHandler будет иметь правильное значение для целевого хеширования в PutRecordRequest.
См. https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/issues/52 для получения дополнительной информации.
Почему ты не принимаешь правильный ответ