Я пытаюсь протестировать прослушиватель Spring Kafka в тесте Spring Boot, используя @EmbeddedKafka
. Однако я продолжаю сталкиваться со следующим исключением:
No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
@Component
@Slf4j
public class CancelAuthorizationLinkageListener {
private final CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
private final CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService;
private final KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate;
private final String retryTopic;
public CancelAuthorizationLinkageListener(CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor,
CancelAuthorizationLinkageServiceInterface cancelAuthorizationLinkageService,
KafkaTemplate<String, CancelAuthorizationLinkageResource> kafkaTemplate,
@Value("${spring.kafka.producer.retry-topic}") String retryTopic) {
this.cancelAuthorizationLinkageProcessor = cancelAuthorizationLinkageProcessor;
this.cancelAuthorizationLinkageService = cancelAuthorizationLinkageService;
this.kafkaTemplate = kafkaTemplate;
this.retryTopic = retryTopic;
}
@Bean
public RecordMessageConverter converter() {
return new JsonMessageConverter();
}
@Bean
public BatchMessagingMessageConverter batchConverter() {
return new BatchMessagingMessageConverter(converter());
}
@KafkaListener(id = "${spring.kafka.consumer.properties.cancel-authorization-linkage-listener-id}",
topics = "${spring.kafka.consumer.linkage-topic}", autoStartup = "false",
batch = "true",
groupId = "group1", concurrency = "2")
public void listen(List<CancelAuthorizationLinkageResource> cancelAuthorizationLinkageResources) {
for (CancelAuthorizationLinkageResource cancelAuthorizationLinkageResource : cancelAuthorizationLinkageResources) {
try {
CancelAuthorizationLinkageWriterResource cancelAuthorizationLinkageWriterResource =
cancelAuthorizationLinkageProcessor.process(cancelAuthorizationLinkageResource);
if (cancelAuthorizationLinkageWriterResource != null) {
cancelAuthorizationLinkageService.linkageAuthorization(
cancelAuthorizationLinkageWriterResource.getApiResource());
}
} catch (Exception e) {
log.error("listener error: {}", e.getMessage());
kafkaTemplate.send(retryTopic, cancelAuthorizationLinkageResource.getAuthorizationId(),
cancelAuthorizationLinkageResource);
}
}
}
Мой слушатель принимает сообщения, и если какое-либо из этих сообщений не удается обработать, я возвращаю их в ту же тему для повторной попытки. Таким образом, мой слушатель одновременно потребляет и производит сообщения. Я должен обеспечить транзакционный характер этого процесса, чтобы предотвратить случаи, когда неудавшиеся сообщения не возвращаются в тему повтора, но смещение для использованного сообщения все еще фиксируется. Это приведет к невозможности повторить неудачные сообщения.
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
properties = {"spring.batch.job.name=cancelAuthorizationLinkageJob",
"bootstrap-servers: ${spring.embedded.kafka.brokers}"})
@DirtiesContext
@EmbeddedKafka(
partitions = 5, topics = {"${spring.kafka.consumer.linkage-topic}", "ppcd.cushion.cancel.auth.retry"},
count = 3)
class CancelAuthorizationLinkageListenerTest {
@Autowired
private CancelAuthorizationLinkageListener cancelAuthorizationLinkageListener;
@Mock
private CancelAuthorizationLinkageProcessor cancelAuthorizationLinkageProcessor;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private ConsumerFactory<String, CancelAuthorizationLinkageResource> consumerFactory;
@Value("${spring.kafka.consumer.linkage-topic}")
private String linkageTopic;
@Value("${spring.kafka.producer.retry-topic}")
private String retryTopic;
private Consumer<String, CancelAuthorizationLinkageResource> consumer;
@BeforeEach
public void setUp() {
consumer = consumerFactory.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, retryTopic);
}
@Test
@DisplayName("OK-取消オーソリ処理中エラーが起きた場合、retryトピックへ送信する")
void of_ok_1() throws Exception {
// init
int ngNumber = 1;
AtomicInteger atomicInteger = new AtomicInteger(0);
// mock
doThrow(new InvalidValueException("test")).when(cancelAuthorizationLinkageProcessor).process(any());
// verify
cancelAuthorizationLinkageListener.listen(List.of(createCancelAuthorizationLinkageResource(true)));
await()
.atMost(2, SECONDS)
.pollInterval(1, SECONDS)
.untilAsserted(() -> {
KafkaTestUtils.getRecords(consumer).records(retryTopic)
.forEach(x -> atomicInteger.incrementAndGet());
assertEquals(ngNumber, atomicInteger.get());
});
}
Я пытаюсь протестировать прослушиватель Spring Kafka в тесте Spring Boot, используя @EmbeddedKafka
. Прослушиватель выглядит правильно настроенным с транзакциями в производственном коде (я не совсем уверен), но в контексте тестовой среды, хотя я использую @Autowired
, прослушиватель не работает внутри транзакции.
Может ли кто-нибудь объяснить, почему это происходит и как обеспечить транзакционное поведение и в тестовом контексте?
spring:
profiles:
active: "local"
application:
name:
batch:
initialize-schema: ALWAYS
job:
names:
#enable: false
kafka:
bootstrap-servers: localhost:9092
producer:
acks: -1
transaction-id-prefix: cushion-kafka-tx-${random.uuid}
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retry-topic: ppcd.cushion.cancel.auth.retry
# retries: 5
consumer:
group-id: groupid-Dev
auto-offset-reset: earliest
max-poll-records: 20
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
properties:
cancel-authorization-linkage-listener-id: cancel-authorization-linkage-listener
test-cancel-authorization-linkage-listener-id: test-cancel-authorization-linkage-listener
spring.json.trusted.packages: '*'
isolation.level: read_committed
linkage-topic: ppcd.matching.credit.auth.cancel.auto.matched.result.cushion
При использовании Spring Boot необходимо только установить свойство spring.kafka.producer.transaction-id-prefix
— Spring Boot автоматически настроит bean-компонент KafkaTransactionManager
и подключит его к контейнеру прослушивателя. Согласно документации Spring, я считаю, что я правильно настроил транзакции.
Итак, либо следуйте рекомендациям из этой ошибки, либо просто не используйте свойство transaction-id-prefix
для конфигурации вашего производителя.
Потому что у вас там нет управления транзакциями: docs.spring.io/spring-framework/reference/testing/…
Большое спасибо за Ваш ответ. Я постарался описать проблему более подробно. Я хочу уточнить, нет ли проблем с моей текущей конфигурацией производственного кода. Если рабочий код в порядке, хотя я использую @Autowired, почему транзакция не активируется автоматически в контексте тестовой среды?