Я пытаюсь создать производитель Kafka внутри функции Lambda с поддержкой однократной доставки, позволяющей отправлять сообщения в MSK.
Обновлено: MSK IAM Auth используется для протокола безопасности между Kafka и клиентами.
Однако, несмотря на то, что (я думаю) я правильно настроил все конфигурации, Producer по-прежнему не может писать сообщения в MSK.
Производитель зависает при вызове init_transactions() и выводит в цикле следующие отладочные сообщения:
%7|1708348719.300|TXNCOORD|lambda#producer-1| [thrd:main]: Unable to query for transaction coordinator: Coordinator query timer: No brokers available for Transactions (3 broker(s) known)
2024-02-19T14:18:39.338+01:00 %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker in state TRY_CONNECT connecting
2024-02-19T14:18:39.338+01:00 %7|1708348719.338|CONNECT|lambda#producer-1| [thrd:TxnCoordinator]: TxnCoordinator: broker has no address yet: postponing connect
2024-02-19T14:18:39.800+01:00 %7|1708348719.800|CONNECT|lambda#producer-1| [thrd:main]: Cluster connection already in progress: acquire ProducerID
2024-02-19T14:18:39.800+01:00 %7|1708348719.800|PIDBROKER|lambda#producer-1| [thrd:main]: No brokers available for Transactions (3 broker(s) known)
Я пробовал менять количество брокеров (с 2 до 4 - не получилось), игрался со значениями настроек транзакции.state.log.replication.factor, транзакции.state.log.min.isr, offsets.topic. replication.factor (даже выставив их все в 1 - не помогло). Советы из этой темы тоже не помогли Проблемы с настройкой Amazon MSK по умолчанию и публикацией с транзакциями
У меня есть следующие конфигурации и настройки для кластера AWS MSK:
Конфигурации кластера:
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
offsets.topic.replication.factor=3
min.insync.replicas=2
default.replication.factor=3
auto.create.topics.enable=true
num.io.threads=8
num.network.threads=2
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
zookeeper.session.timeout.ms=18000
Конфигурации производителя:
{
"client.id": "some_id",
"acks": "all",
"enable.idempotence": "true",
"transactional.id": "123",
}
P.S. Если я не использую acks, Enable.idempotence иtransactional.id — Producer работает нормально, но это превосходит цель поднятия этой проблемы в первую очередь.
UPD: Покопавшись в логах MSK-брокеров, кажется, что соединение не устанавливается - понятия не имею, почему, тем более, что метод аутентификации один и тот же для транзакционного и нетранзакционного Producer, а для нетранзакционного Producer соединение устанавливается просто отлично..

AWS IAM Auth для MSK каким-то образом мешает вызову init_transactions(), потому что не использовать его, а просто использовать PLAINTEXT работает нормально. Не знаю, почему именно это не работает с IAM Auth, возможно, кто-то еще подскажет. В настоящее время в этом варианте использования нет возможности использовать AWS IAM Auth.