Я работаю над чтением данных из Kafka и столкнулся с этим кодом для возврата сведений о соединении Kafka, но я не понимаю, как распределяется контекст. Вот класс для настройки KafkaConnection:
import akka.actor.typed.ActorSystem;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.javadsl.SourceWithContext;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import java.time.Duration;
@Getter
@Slf4j
public final class KafkaSource<K, V> {
private final SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?> commitableSource;
@Builder
private KafkaSource(final Deserializer<K> keyd, final Deserializer<V> valueDeserializer, final ActorSystem actorSystem) {
final String kafkaBootstrapServers = "localhost:9092";
final ConsumerSettings<K, V> kafkaConsumerSettings =
ConsumerSettings.create(actorSystem, keyd, valueDeserializer)
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId("testGroup12")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withStopTimeout(Duration.ofSeconds(5));
final String topics = "request-topic";
this.commitableSource = Consumer.sourceWithOffsetContext(kafkaConsumerSettings,
Subscriptions.topics(topics)).mapContext(ctx -> ctx);
}
}
Вот поток Akka для обработки данных из Kafka, который я написал:
import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
public class ReadFromKafka {
private static ObjectMapper objectMapper = new ObjectMapper();
public static void main(String args[]) {
final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");
var ksource = KafkaSource.<String, String>builder()
.actorSystem(actorSystem)
.keyd(new StringDeserializer()).valueDeserializer(new StringDeserializer())
.build();
ksource.getCommitableSource()
.map(ConsumerRecord::value)
.map(x -> {
var mappedObject = objectMapper.readValue(x, RequestDto.class);
System.out.println("mappedObject is :" + mappedObject);
return mappedObject;
}
)
.log("error")
.asSource()
.map(pair -> pair.second().commitInternal())
.run(actorSystem);
}
}
Сопоставляемый класс RequestDto :
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.*;
import lombok.extern.jackson.Jacksonized;
import java.util.Date;
@Jacksonized
@AllArgsConstructor
@Getter
@Builder
@ToString
public class RequestDto {
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
private final Date datePurchased;
private String someOtherField;
}
Хотя ReadFromKafka
работает так, как ожидалось, почему ConsumerMessage.Committable from SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?>
не удаляется при выполнении:
.map(ConsumerRecord::value)
.map(x -> {
var mappedObject = objectMapper.readValue(x, RequestDto.class);
System.out.println("mappedObject is :" + mappedObject);
return mappedObject;
}
)
.asSource() позволяет получить доступ к контексту внутри кортежа во второй позиции, чтобы затем зафиксировать смещение, используя:
.map(pair -> pair.second().commitInternal())
Я не понимаю, как это работает, кажется, в фоновом режиме происходит что-то неявное, что позволяет распространять контекст по всему потоку?
SourceWithContext<A, B, M>
определяет потоковые операции, которые он поддерживает, чтобы работать только с A
частью значения.
Итак, если f
— это функция, принимающая A
и возвращающая C
, .map(f)
приводит к SourceWithContext<C, B, M>
.
Под капотом это Source<Pair<A, B>, M>
. map
можно было бы определить как-то так (как всегда извиняюсь за ужасную Java):
private Source<Pair<A, B>, M> underlying;
public <C> SourceWithContext<C, B, M> map(Function<A, C> f) {
Source<Pair<C, B>, M> src =
underlying.map(pair -> {
A a = pair.first();
C c = f(a);
return Pair.of<C, B>(c, pair.second()); // no idea if this is the correct Java syntax, but you get the idea
})
return SourceWithContext.fromPairs<C, B, M>(src);
}
Обратите внимание, что f
никогда не увидит second
часть Pair
. До тех пор, пока каждая операция работает правильно в контексте, она просто работает.
Есть операции, где нет однозначного «правильного решения». Примером этого является операция, которая может переупорядочивать элементы.