В настоящее время я участвую в проекте, в котором используются JPA и Kafka. Я пытаюсь найти набор хороших практик для объединения этих операций.
В существующем коде производитель используется в той же транзакции, что и jpa, однако из того, что я прочитал, кажется, что они не разделяют транзакцию.
@PostMapping
@Transactional
public XDto createX(@RequestBody XRequest request) {
Xdto dto = xService.create(request);
kafkaProducer.putToQueue(dto, Type.CREATE);
return dto;
}
где производитель кафки определяется следующим образом:
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Type> template;
public void putToQueue(Dto dto, Type eventType) {
template.send("event", new Event(dto, eventType));
}
}
Это допустимый вариант использования для объединения jpa и kafka, правильно ли определены границы транзакции?




это не будет работать должным образом, когда транзакция завершится неудачно. Взаимодействие с кафкой не является частью транзакции.
Возможно, вы захотите взглянуть на TransactionalEventListener. Возможно, вы захотите написать сообщение в kafka о событии AFTER_COMMIT. даже тогда публикация кафки может закончиться неудачей.
Другой вариант - писать в db, используя jpa, как вы это делаете. Пусть Debezium прочитает обновленные данные из вашей базы данных и отправит их в kafka. Мероприятие будет в другом формате, но гораздо более насыщенным.
Вы не должны помещать отправляющее сообщение в kafka в транзакцию. Если вам нужна логика, когда не удается отправить событие в kafka, а затем откатить транзакцию, в этом случае будет лучше использовать spring-retry. Просто поместите код, связанный с отправкой события в kafka, в аннотированный метод @Retryable, а также добавьте аннотированный метод @Recover с логикой возврата изменений в БД, сделанных ранее.
Глядя на ваш вопрос, я предполагаю, что вы пытаетесь достичь CDC (Change Data Capture) своей системы OLTP, то есть регистрировать каждое изменение, которое поступает в базу данных транзакций. Есть два подхода к этому.
Если CDC - ваш вариант использования, попробуйте использовать любое из уже доступных решений.
Как уже говорили другие, вы можете использовать сбор данных об изменениях для безопасного распространения изменений, примененных к вашей базе данных, в Apache Kafka. Вы не можете обновить базу данных и Kafka за одну транзакцию, поскольку последняя не поддерживает какой-либо протокол двухфазной фиксации.
Вы можете либо CDC сами таблицы, либо, если вы хотите иметь больше контроля над структурой, отправляемой в Kafka, применить шаблон «исходящие». В этом случае ваше приложение будет писать в свои фактические бизнес-таблицы, а также в таблицу «исходящие», которая содержит сообщения для отправки в Kafka. Вы можете найти подробное описание этого подхода в этом Сообщение блога.
Отказ от ответственности: я являюсь автором этого сообщения и руководителем Debezium, одного из решений CDC, упомянутых в некоторых других ответах.
Вы пытаетесь добиться отслеживания измененных данных в системе OLTP?