Я хочу перенести этот код Java в Spring Boot 3:
import java.util.function.BiConsumer;
public void send(String topic, K key, V message, BiConsumer<SendResult<K, V>, Throwable> callback) {
CompletableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
if (Objects.nonNull(callback)) {
future.whenComplete(callback);
}
}
......
// call kafka method
kafkaProducer().send("topic", "key",
"messge", listenableFutureCallback("12345"));
......
private ListenableFutureCallback listenableFutureCallback(String userId) {
return new ListenableFutureCallback() {
@Override
public void onFailure(Throwable ex) {
.......
}
@Override
public void onSuccess(Object result) {
........
}
};
}
Я получаю ошибку в этой строке: listenableFutureCallback("12345")
Cannot find required type
Required type: BiConsumer <org.springframework.kafka.support.SendResult<java.lang.String,java.lang.String>,
java.lang.Throwable>
Provided: ListenableFutureCallback
Знаете ли вы, как мне перенести/заменить ListenableFutureCallback
, чтобы вернуться BiConsumer
из listenableFutureCallback
метода Java?
Вы можете обновить реализацию обратного вызова следующим образом:
import org.springframework.kafka.support.SendResult;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
public void send(String topic, String key, String message, BiConsumer<SendResult<K, V>, Throwable> callback) {
CompletableFuture<SendResult<K, V>> future = kafkaTemplate.send(topic, key, message);
if (Objects.nonNull(callback)) {
future.whenComplete(callback);
}
}
// call kafka method
void test() {
send("topic", "key",
"messge", listenableFutureCallback("12345"));
}
public BiConsumer<SendResult<K, V>, Throwable> listenableFutureCallback(String userId) {
return new BiConsumer<SendResult<K,V>, Throwable>() {
@Override
public void accept(SendResult<K, V> kvSendResult, Throwable throwable) {
if (null != throwable) {
// same flow as of onFailure
} else {
// onSuccess flow
}
}
};
}
Я использую import java.util.function.BiConsumer;
Извини, моя ошибка. Не заметил этого. Обновил ответ. Эту реализацию BiConsumer можно использовать в качестве обратного вызова. Не забудьте также обновить тип возвращаемого значения на BiConsumer<SendResult<K,V>, Throwable>
. Спасибо.
хм... я понимаю Cannot resolve symbol 'V'
. Можете ли вы вставить полный код, пожалуйста?
Конечно, обновил. В случае ошибки, о которой вы упоминаете, пожалуйста, еще раз проверьте, определен ли класс с помощью дженериков K и V, т. е. public class SpringBootApplication<K, V>
. Надеюсь, поможет!
Очевидно, что
ListenableFutureCallback
не расширяется и не напоминаетBiConsumder <org.springframework.kafka.support.SendResult<java.lang.String,java.lang.String>, java.lang.Throwable>
. Так заставь это сделать.