Тестирование прослушивателя Spring Kafka в Spring Boot Test с помощью EmbeddedKafka

Я пытаюсь протестировать прослушиватель Spring Kafka в тесте Spring Boot, используя @EmbeddedKafka. Однако я продолжаю сталкиваться со следующим исключением:

No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

Моя установка:

  • Версия Spring Boot: 3.2.4

Слушатель:

@Component
@Slf4j
public class CancelAuthorizationLinkageListener {
    private final CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
    private final CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService;
    private final KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate;
    private final String retryTopic;

    public CancelAuthorizationLinkageListener(CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor,
                                              CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService,
                                              KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate,
                                              @Value("${spring.kafka.producer.retry-topic}") String retryTopic) {
        this.cancelAuthorizationLinkageProcessor = cancelAuthorizationLinkageProcessor;
        this.cancelAuthorizationLinkageService = cancelAuthorizationLinkageService;
        this.kafkaTemplate = kafkaTemplate;
        this.retryTopic = retryTopic;
    }

    @Bean
    public RecordMessageConverter converter() {
        return new JsonMessageConverter();
    }

    @Bean
    public BatchMessagingMessageConverter batchConverter() {
        return new BatchMessagingMessageConverter(converter());
    }

    @KafkaListener(id = "${spring.kafka.consumer.properties.cancel-authorization-linkage-listener-id}",
            topics = "${spring.kafka.consumer.linkage-topic}", autoStartup = "false",
            batch = "true",
            groupId = "group1", concurrency = "2")
    public void listen(List<CancelAuthorizationLinkageResource> cancelAuthorizationLinkageResources) {
        for (CancelAuthorizationLinkageResource cancelAuthorizationLinkageResource : cancelAuthorizationLinkageResources) {
            try {
                CancelAuthorizationLinkageWriterResource cancelAuthorizationLinkageWriterResource =
                        cancelAuthorizationLinkageProcessor.process(cancelAuthorizationLinkageResource);
                if (cancelAuthorizationLinkageWriterResource != null) {
                    cancelAuthorizationLinkageService.linkageAuthorization(
                            cancelAuthorizationLinkageWriterResource.getApiResource());
                }
            } catch (Exception e) {
                log.error("listener error: {}", e.getMessage());
                kafkaTemplate.send(retryTopic, cancelAuthorizationLinkageResource.getAuthorizationId(),
                        cancelAuthorizationLinkageResource);
            }
        }
    }

Мой слушатель принимает сообщения, и если какое-либо из этих сообщений не удается обработать, я возвращаю их в ту же тему для повторной попытки. Таким образом, мой слушатель одновременно потребляет и производит сообщения. Я должен обеспечить транзакционный характер этого процесса, чтобы предотвратить случаи, когда неудавшиеся сообщения не возвращаются в тему повтора, но смещение для использованного сообщения все еще фиксируется. Это приведет к невозможности повторить неудачные сообщения.

Тестовый код:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
        properties = {"spring.batch.job.name=cancelAuthorizationLinkageJob",
                "bootstrap-servers: ${spring.embedded.kafka.brokers}"})
@DirtiesContext
@EmbeddedKafka(
        partitions = 5, topics = {"${spring.kafka.consumer.linkage-topic}", "ppcd.cushion.cancel.auth.retry"},
        count = 3)
class CancelAuthorizationLinkageListenerTest {

    @Autowired
    private CancelAuthorizationLinkageListener cancelAuthorizationLinkageListener;

    @Mock
    private CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;

    @Autowired
    private EmbeddedKafkaBroker embeddedKafka;

    @Autowired
    private ConsumerFactory<String, CancelAuthorizationLinkageResource> consumerFactory;

    @Value("${spring.kafka.consumer.linkage-topic}")
    private String linkageTopic;

    @Value("${spring.kafka.producer.retry-topic}")
    private String retryTopic;

    private Consumer<String, CancelAuthorizationLinkageResource> consumer;

    @BeforeEach
    public void setUp() {
        consumer = consumerFactory.createConsumer();
        embeddedKafka.consumeFromAnEmbeddedTopic(consumer, retryTopic);
    }

    @Test
    @DisplayName("OK-取消オーソリ処理中エラーが起きた場合、retryトピックへ送信する")
    void of_ok_1() throws Exception {
        // init
        int ngNumber = 1;
        AtomicInteger atomicInteger = new AtomicInteger(0);

        // mock
        doThrow(new InvalidValueException("test")).when(cancelAuthorizationLinkageProcessor).process(any());

        // verify
        cancelAuthorizationLinkageListener.listen(List.of(createCancelAuthorizationLinkageResource(true)));

        await()
                .atMost(2, SECONDS)
                .pollInterval(1, SECONDS)
                .untilAsserted(() -> {
                    KafkaTestUtils.getRecords(consumer).records(retryTopic)
                            .forEach(x -> atomicInteger.incrementAndGet());
                    assertEquals(ngNumber, atomicInteger.get());
                });
    }

Я пытаюсь протестировать прослушиватель Spring Kafka в тесте Spring Boot, используя @EmbeddedKafka. Прослушиватель выглядит правильно настроенным с транзакциями в производственном коде (я не совсем уверен), но в контексте тестовой среды, хотя я использую @Autowired, прослушиватель не работает внутри транзакции. Может ли кто-нибудь объяснить, почему это происходит и как обеспечить транзакционное поведение и в тестовом контексте?

приложение.yml:

spring:
  profiles:
    active: "local"
  application:
    name:
  batch:
    initialize-schema: ALWAYS
    job:
      names:
      #enable: false
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      acks: -1
      transaction-id-prefix: cushion-kafka-tx-${random.uuid}
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retry-topic: ppcd.cushion.cancel.auth.retry

    #      retries: 5
    consumer:
      group-id: groupid-Dev
      auto-offset-reset: earliest
      max-poll-records: 20
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      properties:
        cancel-authorization-linkage-listener-id: cancel-authorization-linkage-listener
        test-cancel-authorization-linkage-listener-id: test-cancel-authorization-linkage-listener
        spring.json.trusted.packages: '*'
        isolation.level: read_committed
      linkage-topic: ppcd.matching.credit.auth.cancel.auto.matched.result.cushion

При использовании Spring Boot необходимо только установить свойство spring.kafka.producer.transaction-id-prefix — Spring Boot автоматически настроит bean-компонент KafkaTransactionManager и подключит его к контейнеру прослушивателя. Согласно документации Spring, я считаю, что я правильно настроил транзакции.

Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
0
72
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Итак, либо следуйте рекомендациям из этой ошибки, либо просто не используйте свойство transaction-id-prefix для конфигурации вашего производителя.

Большое спасибо за Ваш ответ. Я постарался описать проблему более подробно. Я хочу уточнить, нет ли проблем с моей текущей конфигурацией производственного кода. Если рабочий код в порядке, хотя я использую @Autowired, почему транзакция не активируется автоматически в контексте тестовой среды?

dwb5013 11.07.2024 06:47

Потому что у вас там нет управления транзакциями: docs.spring.io/spring-framework/reference/testing/…

Artem Bilan 11.07.2024 15:44

Другие вопросы по теме

Похожие вопросы

Потребитель Spring Cloud Stream Kafka в пакетном режиме не повторяет попытку
Я получаю сообщение об ошибке: org.apache.kafka.common.errors.InvalidReplicationFactorException: коэффициент репликации: 3 больше, чем доступные брокеры: 1
Потоки Kafka правильно обрабатывают сообщения, но выдают исключение десериализации
Какой неустаревший способ установки менеджера транзакций в контейнер прослушивателя - это Spring Kafka 3.2?
@KafkaListener сContainerFactory не запускается в @SpringBootTest
Проблема с bootstrap.servers в конфигурации Kafka Producer с Spring Boot 3.2.6 и Spring Cloud 2023.0.2
Этот обработчик ошибок не может обрабатывать исключения org.apache.kafka.common.KafkaException; информация о записи отсутствует
@KafkaHandler в потребителе не использует сообщение темы как объектный класс, а только как строку
Утечка памяти Java при создании множества пустых карт
Вызвано: org.messaging.handler.annotation.support.MethodArgumentNotValidException: не удалось разрешить параметр метода с индексом 1 в public void