Я использую интеграцию Spring для обновления БД значениями, поступающими в сообщении.
Входящий канал запрашивает базу данных Mongo, набор результатов должен быть обновлен в соответствии со статусом обработки.
Поток работает, но я не получаю ожидаемого поведения.
Я ожидал, что обработчик сообщений, MongoDbStoringMessageHandler, в конечном итоге выполнит обновление, но похоже, что он полностью переопределяет объект в БД с помощью полезной нагрузки сообщения.
В полезной нагрузке есть не все поля, которые присутствуют в БД.
Есть ли какая-то конфигурация, которую мне не хватает? Или это не типичный вариант использования?
Ниже моя конфигурация:
Источник входящего сообщения
@Bean
public MessageSource mongoMessageSource(MongoDbFactory mongoDbFactory) {
MongoDbMessageSource mongoDbMessageSource = new MongoDbMessageSource(mongoDbFactory,
new LiteralExpression(new BasicDBObject("status","test").toString()));
mongoDbMessageSource.
setCollectionNameExpression(new LiteralExpression("BankAccountDetail"));
mongoDbMessageSource.setEntityClass(AccountBalanceDetails.class);
return mongoDbMessageSource;
}
Выходной канал для указанного выше источника
@Bean
public IntegrationFlow mongoInboundChannel(MessageSource messageSource) {
return IntegrationFlows.from(messageSource).
channel("messageChannel")
.get();
}
Выходной канал:
@Bean
public MessageChannel messageChannel() {
return new DirectChannel();
}
Основной поток: -
@Bean
public IntegrationFlow fromDirect(@Qualifier("messageChannel") MessageChannel messageChannel,
AccountBalanceTransformer accountBalanceTransformer,
MongoDbFactory mongoDbFactory, AccountBalanceUpdater accountBalanceUpdater) {
return IntegrationFlows.from(messageChannel).split().
handle(accountBalanceUpdater, "update").
//handle(System.out::println).
handle(mongoOutput(mongoDbFactory)).
get();
}
Обработчик исходящего сообщения
@Bean
public MessageHandler mongoOutput(MongoDbFactory mongoDbFactory) {
MongoDbStoringMessageHandler mongoDbStoringMessageHandler = new MongoDbStoringMessageHandler(mongoDbFactory);
mongoDbStoringMessageHandler.setCollectionNameExpression(new LiteralExpression("BankAccountDetail"));
return mongoDbStoringMessageHandler;
}
Вот мой модельный класс: -
@Document(collection = "BankAccountDetail")
открытый класс AccountBalanceDetails {
private String id;
private String status;
private String message;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
@Override
public String toString() {
return "AccountBalanceDetails{" +
"id='" + id + '\'' +
", status='" + status + '\'' +
", message='" + message + '\'' +
'}';
}
}




Я смог добиться этого, написав собственный обработчик сообщений.
public class CustomMongoMessageHandler extends AbstractMessageHandler {
@Autowired
private MongoTemplate mongoTemplate;
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
AccountBalanceDetails accountBalanceDetails = (AccountBalanceDetails) message.getPayload();
Query basicQuery = new Query();
mongoTemplate.updateFirst(basicQuery.addCriteria(Criteria.
where("id").is(accountBalanceDetails.getId())),
Update.update("status", accountBalanceDetails.getStatus()),
AccountBalanceDetails.class);
}
}
Вместо этого:
@Bean
public IntegrationFlow fromDirect(@Qualifier("messageChannel") MessageChannel messageChannel,
AccountBalanceTransformer accountBalanceTransformer,
MongoDbFactory mongoDbFactory, AccountBalanceUpdater accountBalanceUpdater) {
return IntegrationFlows.from(messageChannel).split().
handle(accountBalanceUpdater, "update").
handle(mongoOutput(mongoDbFactory)).
get();
}
Сейчас использую:
@Bean
public IntegrationFlow fromDirect(@Qualifier("messageChannel") MessageChannel messageChannel,
AccountBalanceTransformer accountBalanceTransformer,
MongoDbFactory mongoDbFactory, AccountBalanceUpdater accountBalanceUpdater,
CustomMongoMessageHandler customMongoMessageHandler) {
return IntegrationFlows.from(messageChannel).split().
handle(accountBalanceUpdater, "update").
handle(customMongoMessageHandler()).
get();
}
Это действительно upsert для существующего документа, но поскольку вы предоставляете весь документ, вы в конечном итоге переопределяете все его свойства в соответствии с предоставленными полями.
Вы можете рассмотреть возможность использования MongoDbOutboundGateway (MongoDb.outboundGateway() для Java DSL) с соответствующим CollectionCallback:
/**
* Reference to an instance of {@link CollectionCallback} which specifies the database operation to execute.
* This property is mutually exclusive with {@link #query} and {@link #queryExpression} properties.
* @param collectionCallback the {@link CollectionCallback} instance
* @param <P> the type of the message payload.
* @return the spec
*/
public <P> MongoDbOutboundGatewaySpec collectionCallback(CollectionCallback<P> collectionCallback) {
Я пытаюсь подчеркнуть, что я не предоставляю никаких других свойств, кроме статуса и идентификатора. У меня нет остальных полей, когда может прийти исключение
Правильно, поэтому я предлагаю выполнить тот же partial update, который вы делаете в своем пользовательском коде через упомянутый CollectionCallback в MongoDbOutboundGateway.
Мой фактический вариант использования - получить документ от Mongo и передать его через несколько преобразователей, только первый преобразователь получает исходный объект DB, остальные получают другой объект, который имеет идентификатор объекта DB и другие поля. Когда возникает ошибка, я создаю объект POJO коллекции БД и устанавливаю идентификатор, полученный из сообщения, и статус как ошибку. Когда я отправляю этот частично созданный объект в исходящий канал mongo, я получаю поведение, определенное в вопросе.