Имя потребительской темы NestJS kafka не определено

Через некоторое время я возвращаюсь к NestJS и пытаюсь настроить простого потребителя Kafka. Вот базовый пример моего контроллера, использующего декоратор EventPattern для подписки на тему Kafka.

import { Controller } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';

@Controller()
export class MyController {
  @EventPattern('my-topic')
  async handleMessage(@Payload() data: Record<string, unknown>) {
    console.info(data);
  }
}

Раньше эта настройка работала для меня, но теперь я столкнулся с некоторыми проблемами. При запуске KafkaJS выдает следующую ошибку:

KafkaJSNonRetriableError: Invalid topic undefined

Мне кажется, название темы не правильно подобрано. Я немного в недоумении, почему это так. Для контекста, вот как я настраивал клиент Kafka в main.ts:

const app = await NestFactory.create(AppModule);
...
app.connectMicroservice({
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: [process.env['KAFKA_BROKER']],
      clientId: process.env['KAFKA_CLIENT_ID'],
    },
    consumer: {
      groupId: process.env['KAFKA_CONSUMER_GROUP_ID'],
      allowAutoTopicCreation: true,
    },
  },
});
...
await app.startAllMicroservices();

Любые идеи будут высоко оценены!

Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Построение конвейеров данных в реальном времени с Apache Kafka: Руководство по Python
Apache Kafka - популярная платформа распределенной потоковой передачи данных, которую можно использовать для построения конвейеров данных в реальном...
0
0
102
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я решил эту проблему, обновив kafkajs до версии 2.2.4.

Другие вопросы по теме

Похожие вопросы

Я получаю сообщение об ошибке: org.apache.kafka.common.errors.InvalidReplicationFactorException: коэффициент репликации: 3 больше, чем доступные брокеры: 1
Потоки Kafka правильно обрабатывают сообщения, но выдают исключение десериализации
Какой неустаревший способ установки менеджера транзакций в контейнер прослушивателя - это Spring Kafka 3.2?
Как использовать Kafka в режиме SASL_SSL
Тайм-аут соединения: невозможно подключить монго к дебезию для CDC
Неожиданный запрос Kafka типа METADATA во время установления связи SASL при подключении потребителя к серверу Kafka
@KafkaHandler в потребителе не использует сообщение темы как объектный класс, а только как строку
Потоковая передача Spark + интеграция с Kafka, чтение данных из Kafka каждые 15 минут и сохранение смещения последнего чтения с помощью PySpark
Ошибка CompletionException в KafkaListener при прослушивании событий формата KafkaAvro
Как Кафка использует отложенную запись?