Потоки Akka — почему контекст не сбрасывается?

Я работаю над чтением данных из Kafka и столкнулся с этим кодом для возврата сведений о соединении Kafka, но я не понимаю, как распределяется контекст. Вот класс для настройки KafkaConnection:

import akka.actor.typed.ActorSystem;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.javadsl.SourceWithContext;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;

import java.time.Duration;

@Getter
@Slf4j
public final class KafkaSource<K, V> {

    private final SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?> commitableSource;

    @Builder
    private KafkaSource(final Deserializer<K> keyd, final Deserializer<V> valueDeserializer, final ActorSystem actorSystem) {

        final String kafkaBootstrapServers = "localhost:9092";

        final ConsumerSettings<K, V> kafkaConsumerSettings =
                ConsumerSettings.create(actorSystem, keyd, valueDeserializer)
                        .withBootstrapServers(kafkaBootstrapServers)
                        .withGroupId("testGroup12")
                        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                        .withStopTimeout(Duration.ofSeconds(5));

        final String topics = "request-topic";

        this.commitableSource = Consumer.sourceWithOffsetContext(kafkaConsumerSettings,
                Subscriptions.topics(topics)).mapContext(ctx -> ctx);
    }
}

Вот поток Akka для обработки данных из Kafka, который я написал:

import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

public class ReadFromKafka {

    private static ObjectMapper objectMapper = new ObjectMapper();

    public static void main(String args[]) {

        final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");

        var ksource = KafkaSource.<String, String>builder()
                .actorSystem(actorSystem)
                .keyd(new StringDeserializer()).valueDeserializer(new StringDeserializer())
                .build();

        ksource.getCommitableSource()
                .map(ConsumerRecord::value)
                .map(x -> {
                            var mappedObject = objectMapper.readValue(x, RequestDto.class);
                            System.out.println("mappedObject is :" + mappedObject);
                            return mappedObject;
                        }
                )
                .log("error")
                .asSource()
                .map(pair -> pair.second().commitInternal())
                .run(actorSystem);
    }
}

Сопоставляемый класс RequestDto :

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.*;
import lombok.extern.jackson.Jacksonized;

import java.util.Date;

@Jacksonized
@AllArgsConstructor
@Getter
@Builder
@ToString
public class RequestDto {

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
    private final Date datePurchased;

    private String someOtherField;

}

Хотя ReadFromKafka работает так, как ожидалось, почему ConsumerMessage.Committable from SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?> не удаляется при выполнении:

.map(ConsumerRecord::value)
.map(x -> {
            var mappedObject = objectMapper.readValue(x, RequestDto.class);
            System.out.println("mappedObject is :" + mappedObject);
            return mappedObject;
        }
) 

.asSource() позволяет получить доступ к контексту внутри кортежа во второй позиции, чтобы затем зафиксировать смещение, используя:

.map(pair -> pair.second().commitInternal())

Я не понимаю, как это работает, кажется, в фоновом режиме происходит что-то неявное, что позволяет распространять контекст по всему потоку?

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
29
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

SourceWithContext<A, B, M> определяет потоковые операции, которые он поддерживает, чтобы работать только с A частью значения.

Итак, если f — это функция, принимающая A и возвращающая C, .map(f) приводит к SourceWithContext<C, B, M>.

Под капотом это Source<Pair<A, B>, M>. map можно было бы определить как-то так (как всегда извиняюсь за ужасную Java):

private Source<Pair<A, B>, M> underlying;

public <C> SourceWithContext<C, B, M> map(Function<A, C> f) {
    Source<Pair<C, B>, M> src =
        underlying.map(pair -> {
            A a = pair.first();
            C c = f(a);
            return Pair.of<C, B>(c, pair.second()); // no idea if this is the correct Java syntax, but you get the idea
        })
    return SourceWithContext.fromPairs<C, B, M>(src);
}

Обратите внимание, что f никогда не увидит second часть Pair. До тех пор, пока каждая операция работает правильно в контексте, она просто работает.

Есть операции, где нет однозначного «правильного решения». Примером этого является операция, которая может переупорядочивать элементы.

Другие вопросы по теме