Хорошая практика при использовании kafka с jpa

В настоящее время я участвую в проекте, в котором используются JPA и Kafka. Я пытаюсь найти набор хороших практик для объединения этих операций.

В существующем коде производитель используется в той же транзакции, что и jpa, однако из того, что я прочитал, кажется, что они не разделяют транзакцию.

@PostMapping
@Transactional
public XDto createX(@RequestBody XRequest request) {
    Xdto dto = xService.create(request);
    kafkaProducer.putToQueue(dto, Type.CREATE);
    return dto;
}

где производитель кафки определяется следующим образом:

public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Type> template;

    public void putToQueue(Dto dto, Type eventType) {
        template.send("event", new Event(dto, eventType));
    }
}

Это допустимый вариант использования для объединения jpa и kafka, правильно ли определены границы транзакции?

Вы пытаетесь добиться отслеживания измененных данных в системе OLTP?

pushpavanthar 05.07.2018 23:01
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
15
1
3 374
4
Перейти к ответу Данный вопрос помечен как решенный

Ответы 4

Ответ принят как подходящий

это не будет работать должным образом, когда транзакция завершится неудачно. Взаимодействие с кафкой не является частью транзакции.

Возможно, вы захотите взглянуть на TransactionalEventListener. Возможно, вы захотите написать сообщение в kafka о событии AFTER_COMMIT. даже тогда публикация кафки может закончиться неудачей.

Другой вариант - писать в db, используя jpa, как вы это делаете. Пусть Debezium прочитает обновленные данные из вашей базы данных и отправит их в kafka. Мероприятие будет в другом формате, но гораздо более насыщенным.

Вы не должны помещать отправляющее сообщение в kafka в транзакцию. Если вам нужна логика, когда не удается отправить событие в kafka, а затем откатить транзакцию, в этом случае будет лучше использовать spring-retry. Просто поместите код, связанный с отправкой события в kafka, в аннотированный метод @Retryable, а также добавьте аннотированный метод @Recover с логикой возврата изменений в БД, сделанных ранее.

Глядя на ваш вопрос, я предполагаю, что вы пытаетесь достичь CDC (Change Data Capture) своей системы OLTP, то есть регистрировать каждое изменение, которое поступает в базу данных транзакций. Есть два подхода к этому.

  1. Код приложения выполняет двойную запись в транзакционную БД, а также в Kafka. Это непоследовательно и мешает работе. Несогласованность, потому что, когда вы выполняете двойную запись в две независимые системы, данные оказываются ошибочными, когда одна из операций записи терпит неудачу, а отправка данных в Kafka в потоке транзакций увеличивает задержку, которую вы не хотите ставить под угрозу.
  2. Извлеките изменения из фиксации БД (триггеры уровня базы данных / приложения или журнал транзакций) и отправьте их в Kafka. Это очень согласованно и совершенно не влияет на вашу транзакцию. Последовательно, потому что журналы фиксации БД являются отражением транзакций БД после успешных фиксаций. Доступно множество решений, использующих этот подход, например шина данных, Максвелл, Debezium и т. д.

Если CDC - ваш вариант использования, попробуйте использовать любое из уже доступных решений.

Как уже говорили другие, вы можете использовать сбор данных об изменениях для безопасного распространения изменений, примененных к вашей базе данных, в Apache Kafka. Вы не можете обновить базу данных и Kafka за одну транзакцию, поскольку последняя не поддерживает какой-либо протокол двухфазной фиксации.

Вы можете либо CDC сами таблицы, либо, если вы хотите иметь больше контроля над структурой, отправляемой в Kafka, применить шаблон «исходящие». В этом случае ваше приложение будет писать в свои фактические бизнес-таблицы, а также в таблицу «исходящие», которая содержит сообщения для отправки в Kafka. Вы можете найти подробное описание этого подхода в этом Сообщение блога.

Отказ от ответственности: я являюсь автором этого сообщения и руководителем Debezium, одного из решений CDC, упомянутых в некоторых других ответах.

Другие вопросы по теме