Я пытаюсь стандартизировать десериализацию в своем приложении KSQLDB на Java, но не могу понять, как обрабатывать тип Row
, возвращаемый типом KSQLDB Client
. Ex (попробовать/уловить удалено):
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.BatchedQueryResult;
Client ksqldbClient = kafkaService.getKSQLDBClient();
String queryString = String.format("SELECT * FROM %s WHERE %s = '%s';", tableName, primaryKeyName, id);
BatchedQueryResult query = ksqldbClient.executeQuery(queryString);
List<Row> rows = query.get();
Мои таблицы KSQLDB настроены на использование сериализации protobuf, но похоже, что тип Row
— это JSON? Я могу только получить его данные через:
for (Row row : rows) {
String json = row.asObject().toJsonString();
// Deserialize json string
...
}
Выполняет ли клиент KSQLDB десериализацию protobuf самостоятельно? Есть ли способ получить только байты protobuf, чтобы я мог передать их в мой десериализатор Protobuf, который я уже определил, поэтому мне не нужно также писать десериализатор JSON?
Хорошо, спасибо за разъяснение. Не стесняйтесь публиковать ответ с некоторыми примерами, чтобы претендовать на репутацию.
row.asObject()
возвращает KsqlObject, который уже десериализован и работает аналогично набору результатов JDBC, поскольку вы можете вызывать для него различные методы получения для типов в строке.
Если вы хотите сопоставить конкретный объект домена, который вы бы сгенерировали из Protobuf, похоже, прямого пути нет, и вам, вероятно, лучше использовать Kafka Streams напрямую, а не KSQL, если вам нужна эта функция.
Он уже десериализован. Вы можете получить определенные поля из
row.asObject()
, например,getString
,getDouble
и т. д. Зачем вам нужны байты или строка JSON?