Как использовать десериализацию protobuf в строках запроса KSQLDB?

Я пытаюсь стандартизировать десериализацию в своем приложении KSQLDB на Java, но не могу понять, как обрабатывать тип Row, возвращаемый типом KSQLDB Client. Ex (попробовать/уловить удалено):

    import io.confluent.ksql.api.client.Client;
    import io.confluent.ksql.api.client.BatchedQueryResult;

    Client ksqldbClient = kafkaService.getKSQLDBClient();
    String queryString = String.format("SELECT * FROM %s WHERE %s = '%s';", tableName, primaryKeyName, id);
    BatchedQueryResult query = ksqldbClient.executeQuery(queryString);
    List<Row> rows = query.get();

Мои таблицы KSQLDB настроены на использование сериализации protobuf, но похоже, что тип Row — это JSON? Я могу только получить его данные через:

    for (Row row : rows) {
        String json = row.asObject().toJsonString();
        // Deserialize json string
        ...
    }

Выполняет ли клиент KSQLDB десериализацию protobuf самостоятельно? Есть ли способ получить только байты protobuf, чтобы я мог передать их в мой десериализатор Protobuf, который я уже определил, поэтому мне не нужно также писать десериализатор JSON?

Он уже десериализован. Вы можете получить определенные поля из row.asObject(), например, getString, getDouble и т. д. Зачем вам нужны байты или строка JSON?

OneCricketeer 09.12.2020 19:47

Хорошо, спасибо за разъяснение. Не стесняйтесь публиковать ответ с некоторыми примерами, чтобы претендовать на репутацию.

ChosunOne 09.12.2020 22:09
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
2
344
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

row.asObject() возвращает KsqlObject, который уже десериализован и работает аналогично набору результатов JDBC, поскольку вы можете вызывать для него различные методы получения для типов в строке.

Если вы хотите сопоставить конкретный объект домена, который вы бы сгенерировали из Protobuf, похоже, прямого пути нет, и вам, вероятно, лучше использовать Kafka Streams напрямую, а не KSQL, если вам нужна эта функция.

Другие вопросы по теме

Задержка начала потребления сообщений
Я пытаюсь построить коннектор снежинки для Кафки. Есть ли способ, которым коннектор обрабатывает такие события, как обновление, удаление и создание?
Приложение Kafka и Python в Kubernetes в отдельных модулях — NoBrokersAvailable()
Confluent Schema Registry — максимальное количество схем
NotEnoughReplicasException: сообщения отклоняются, так как синхронизированных реплик меньше, чем требуется
В указанном файле конфигурации JAAS не найден раздел конфигурации JAAS с именем «Клиент»
Разница между 3 приложениями setConcurrency(1) и 1 приложением setConcurrency(3)
Настройка Nifi для использования с Kafa в Kubernetes с использованием Helm в VirtualBox
Невозможно запустить PySpark (от Kafka до Delta) локально и получить исключение SparkException: не удается найти класс подключаемого модуля каталога для каталога «spark_catalog»
Понимание варианта использования свойства max.in.flight.request в Kafka