Из ровно один раз КИП относительно идемпотентности производителя при перезапуске приложения с помощью InitPidRequest
:
2.1 When an TransactionalId is specified If the transactional.id configuration is set, this TransactionalId passed along with the InitPidRequest, and the mapping to the corresponding PID is logged in the transaction log in step 2a. This enables us to return the same PID for the TransactionalId to future instances of the producer, and hence enables recovering or aborting previously incomplete transactions.
In addition to returning the PID, the InitPidRequest performs the following tasks:
Bumps up the epoch of the PID, so that the any previous zombie instance of the producer is fenced off and cannot move forward with its transaction.
Recovers (rolls forward or rolls back) any transaction left incomplete by the previous instance of the producer. The handling of the InitPidRequest is synchronous. Once it returns, the producer can send data and start new transactions.
Когда производитель терпит неудачу и запускается снова и выполняется InitPidRequest
, в каких ситуациях последняя транзакция «откатывается вперед» (я думаю, это означает совершение) или «откатывается»? Как это контролируется?
Ключевым компонентом, который позволяет Kafka добиться этого, является Координатор сделок. Это было введено как часть KIP, о котором вы упомянули. Координатор транзакций создается брокером как часть процесса инициализации и хранит в памяти следующую информацию:
TransactionalId
до назначенного PID
, текущий номер эпохи (метка времени Unix) и значение времени ожидания транзакцииPID
к текущему текущему статусу транзакции производителя, указанному PID
, темам-разделам участников и времени последнего обновления этого статусаТеперь, чтобы ответить на ваш вопрос о прокрутке вперед или назад транзакции:
Когда производитель выходит из строя и перезапускается, он отправляет новый InitPidRequest
координатору транзакций, если производитель поставляется с непустым TransactionalId
(предоставляется в качестве параметра конфигурации приложением производителя).
Координатор транзакций при получении этого запроса затем проверяет, есть ли уже запись с предоставленным TransactionalId
в отображении в памяти (пункт 1 выше). Если существует сопоставление, он будет искать PID
во второй карте в памяти (пункт 2 выше), чтобы проверить, есть ли какая-либо текущая транзакция против этого PID
:
BEGIN
, то транзакция будет прервана.
(Примечание: Это версия откат)PREPARE_ABORT
, либо в PREPARE_COMMIT
, то координатор транзакций будет ждать, пока транзакция перейдет либо в COMPLETE_ABORT
(версия откат), либо в COMPLETE_COMMIT
(версия перекат вперед).После этого координатор транзакций отвечает последним PID
и отметкой времени эпохи для TransactionalId
, после чего производитель может начать отправку новых транзакций.
Я старался свести объяснения к минимуму, но если вас интересуют подробности, то вот детальный проектный документ для справки.
Надеюсь, это поможет!
@b15 - Спасибо! Я рад, что вы нашли это полезным. :)
Это был фантастический ответ. Спасибо.