Поддержка транзакций AWS MSK

Я пытаюсь создать производитель Kafka внутри функции Lambda с поддержкой однократной доставки, позволяющей отправлять сообщения в MSK.

Обновлено: MSK IAM Auth используется для протокола безопасности между Kafka и клиентами.

Однако, несмотря на то, что (я думаю) я правильно настроил все конфигурации, Producer по-прежнему не может писать сообщения в MSK.

Производитель зависает при вызове init_transactions() и выводит в цикле следующие отладочные сообщения:

%7|1708348719.300|TXNCOORD|lambda#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (3 broker(s) known)

2024-02-19T14:18:39.338+01:00   %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker in state TRY_CONNECT connecting

2024-02-19T14:18:39.338+01:00   %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker has no address yet: postponing connect

2024-02-19T14:18:39.800+01:00   %7|1708348719.800|CONNECT|lambda#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID

2024-02-19T14:18:39.800+01:00   %7|1708348719.800|PIDBROKER|lambda#producer-1| [thrd:main]: No brokers available for Transactions (3 broker(s) known)

Я пробовал менять количество брокеров (с 2 до 4 - не получилось), игрался со значениями настроек транзакции.state.log.replication.factor, транзакции.state.log.min.isr, offsets.topic. replication.factor (даже выставив их все в 1 - не помогло). Советы из этой темы тоже не помогли Проблемы с настройкой Amazon MSK по умолчанию и публикацией с транзакциями

У меня есть следующие конфигурации и настройки для кластера AWS MSK:

  • 4 брокера в 2 зонах доступности
  • Кафка 2.8.1
  • Размер кластера: kafka.t3.small (проверил, то же самое происходит и с kafka.m5.large)

Конфигурации кластера:

transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
offsets.topic.replication.factor=3
min.insync.replicas=2
default.replication.factor=3
auto.create.topics.enable=true
num.io.threads=8
num.network.threads=2
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
zookeeper.session.timeout.ms=18000

Конфигурации производителя:

  • Использование Confluent Kafka для Python для создания производителя
{
  "client.id": "some_id",
  "acks": "all",
  "enable.idempotence": "true",
  "transactional.id": "123",
}

P.S. Если я не использую acks, Enable.idempotence иtransactional.id — Producer работает нормально, но это превосходит цель поднятия этой проблемы в первую очередь.

UPD: Покопавшись в логах MSK-брокеров, кажется, что соединение не устанавливается - понятия не имею, почему, тем более, что метод аутентификации один и тот же для транзакционного и нетранзакционного Producer, а для нетранзакционного Producer соединение устанавливается просто отлично..

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
197
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

AWS IAM Auth для MSK каким-то образом мешает вызову init_transactions(), потому что не использовать его, а просто использовать PLAINTEXT работает нормально. Не знаю, почему именно это не работает с IAM Auth, возможно, кто-то еще подскажет. В настоящее время в этом варианте использования нет возможности использовать AWS IAM Auth.

Другие вопросы по теме