Я создаю данные для Kafka в формате AVRO.
Я создал схему с помощью
curl --location 'https://confluent-schema-registry.example.com/subjects/production.iot.device.avro-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{
"schema": "{\"type\": \"record\", \"name\": \"device\", \"fields\":[{ \"name\": \"status\", \"type\": \"string\"},{ \"name\": \"location\", \"type\": \"string\"},{ \"name\": \"type\", \"type\": \"string\"},{ \"name\": \"temperature\", \"type\": \"long\"},{ \"name\": \"humidity\", \"type\": \"double\"},{ \"name\": \"battery\", \"type\": \"long\"},{ \"name\": \"signal_strength\", \"type\": \"long\"},{ \"name\": \"mode\", \"type\": \"string\"},{ \"name\": \"active\", \"type\": \"boolean\"}]}"
}'
я использую
Ниже приведен код, который может успешно записать данные в Kafka:
#include <avro.h>
#include <glib.h>
#include <librdkafka/rdkafka.h>
#include <libserdes/serdes-avro.h>
#include <libserdes/serdes.h>
#include <signal.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "config.c"
#define ARR_SIZE(arr) (sizeof((arr)) / sizeof((arr[0])))
static volatile bool is_running = true;
static void delivery_report(rd_kafka_t *kafka_handle,
const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err) {
g_error("Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
}
}
void signal_handler(int signal) {
if (signal == SIGINT || signal == SIGTERM) {
is_running = false;
}
}
int main(int argc, char **argv) {
rd_kafka_t *producer;
rd_kafka_conf_t *conf;
serdes_conf_t *serdes_conf;
serdes_t *serdes;
char errstr[512];
if (argc != 2) {
g_error("Usage: %s <config.ini>", argv[0]);
}
const char *config_file = argv[1];
g_autoptr(GError) error = NULL;
g_autoptr(GKeyFile) key_file = g_key_file_new();
if (!g_key_file_load_from_file(key_file, config_file, G_KEY_FILE_NONE, &error)) {
g_error("Error loading config file: %s", error->message);
}
conf = rd_kafka_conf_new();
load_config_group(conf, key_file, "default");
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "10000000", NULL, 0);
rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "10485760", NULL, 0);
rd_kafka_conf_set(conf, "batch.size", "65536", NULL, 0);
rd_kafka_conf_set(conf, "linger.ms", "5", NULL, 0);
rd_kafka_conf_set_dr_msg_cb(conf, delivery_report);
producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!producer) {
g_error("Failed to create new producer: %s", errstr);
}
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
serdes_conf = serdes_conf_new(
NULL, 0, "schema.registry.url",
"https://confluent-schema-registry.example.com", NULL);
serdes = serdes_new(serdes_conf, errstr, sizeof(errstr));
if (!serdes) {
g_error("Failed to create serdes instance: %s", errstr);
}
const char *topic = "production.iot.device.avro";
const char *schema_name = "production.iot.device.avro-value";
serdes_schema_t *serdes_schema =
serdes_schema_get(serdes, schema_name, -1, errstr, sizeof(errstr));
if (!serdes_schema) {
g_error("Failed to retrieve AVRO schema: %s", errstr);
}
const char *device_ids[6] = {"device1", "device2", "device3",
"device4", "device5", "device6"};
const char *status_list[3] = {"online", "offline", "maintenance"};
const char *locations[3] = {"locationA", "locationB", "locationC"};
const char *types[3] = {"type1", "type2", "type3"};
srandom(time(NULL)); // Seed the random number generator
while (is_running) {
const char *key = device_ids[random() % ARR_SIZE(device_ids)];
const char *status = status_list[random() % ARR_SIZE(status_list)];
const char *location = locations[random() % ARR_SIZE(locations)];
const char *type = types[random() % ARR_SIZE(types)];
double temperature = ((double)random() / RAND_MAX) * 100.0 - 50.0;
double humidity = ((double)random() / RAND_MAX);
int battery = random() % 101;
int signal_strength = random() % 101;
const char *mode = (random() % 2) ? "manual" : "auto";
bool active = (random() % 2);
avro_schema_t schema = serdes_schema_avro(serdes_schema);
avro_value_iface_t *record_class = avro_generic_class_from_schema(schema);
avro_value_t record;
avro_generic_value_new(record_class, &record);
avro_value_t field;
if (avro_value_get_by_name(&record, "status", &field, NULL) == 0) {
avro_value_set_string(&field, status);
}
if (avro_value_get_by_name(&record, "location", &field, NULL) == 0) {
avro_value_set_string(&field, location);
}
if (avro_value_get_by_name(&record, "type", &field, NULL) == 0) {
avro_value_set_string(&field, type);
}
if (avro_value_get_by_name(&record, "temperature", &field, NULL) == 0) {
avro_value_set_long(&field, temperature);
}
if (avro_value_get_by_name(&record, "humidity", &field, NULL) == 0) {
avro_value_set_double(&field, humidity);
}
if (avro_value_get_by_name(&record, "battery", &field, NULL) == 0) {
avro_value_set_long(&field, battery);
}
if (avro_value_get_by_name(&record, "signal_strength", &field, NULL) == 0) {
avro_value_set_long(&field, signal_strength);
}
if (avro_value_get_by_name(&record, "mode", &field, NULL) == 0) {
avro_value_set_string(&field, mode);
}
if (avro_value_get_by_name(&record, "active", &field, NULL) == 0) {
avro_value_set_boolean(&field, active);
}
void *avro_payload = NULL;
size_t avro_size;
serdes_err_t serr = serdes_schema_serialize_avro(serdes_schema, &record, &avro_payload, &avro_size, errstr, sizeof(errstr));
if (serr != SERDES_ERR_OK) {
g_error("Failed to serialize data: %s", serdes_err2str(serr));
}
rd_kafka_resp_err_t err;
err = rd_kafka_producev(producer, RD_KAFKA_V_TOPIC(topic),
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
RD_KAFKA_V_KEY((void *)key, strlen(key)),
RD_KAFKA_V_VALUE(avro_payload, avro_size),
RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END);
if (err) {
g_error("Failed to produce to topic %s: %s", topic,
rd_kafka_err2str(err));
}
free(avro_payload);
avro_value_decref(&record);
rd_kafka_poll(producer, 0);
g_usleep(50); // μs
}
g_message("Flushing final messages ...");
rd_kafka_flush(producer, 10 * 1000);
if (rd_kafka_outq_len(producer) > 0) {
g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));
}
g_message("Producer stopped.");
rd_kafka_destroy(producer);
serdes_schema_destroy(serdes_schema);
serdes_destroy(serdes);
return 0;
}
Теперь, для полной совместимости схемы, я постарался сделать каждое поле необязательным. Я удалил старую схему и воссоздал новую с помощью
curl --location 'https://confluent-schema-registry.example.com/subjects/production.iot.device.avro-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{
"schema": "{\"type\": \"record\", \"name\": \"device\", \"fields\":[{ \"name\": \"status\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"location\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"type\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"temperature\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"humidity\", \"type\": [\"null\", \"double\"], \"default\": null},{ \"name\": \"battery\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"signal_strength\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"mode\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"active\", \"type\": [\"null\", \"boolean\"], \"default\": null}]}"
}'
Но теперь тот же код даст сбой в строке serdes_err_t serr = serdes_schema_serialize_avro(schema, &record, &avro_payload, &avro_size, errstr, sizeof(errstr));
и вернет ошибку:
Не удалось сериализовать данные: сбой сериализатора.
Согласно моим исследованиям, «Сбой сериализатора» соответствует SERDES_ERR_SERIALIZER , и эта часть в библиотеке libserdes вызвала эту ошибку. Функция avro_value_sizeof
внутри не удалась, и вот avro_value_sizeof
исходный код.
На основе этой информации я добавил этот раздел для воспроизведения:
int aerr = avro_value_sizeof(&record, &avro_size);
if (aerr) {
printf("%s\n", strerror(aerr));
}
Я вижу, что это распечатывается
Неверный аргумент
Однако я не уверен, что вызвало эту проблему. Любой гид будет признателен, спасибо!
В моем исходном коде я добавил
void print_serdes_schema(serdes_schema_t *serdes_schema) {
if (!serdes_schema) {
printf("serdes_schema is NULL.\n");
return;
}
const char *schema_name = serdes_schema_name(serdes_schema);
if (schema_name) {
printf("schema_name: %s\n", schema_name);
} else {
printf("Failed to retrieve schema_name.\n");
}
int schema_id = serdes_schema_id(serdes_schema);
printf("schema_id: %d\n", schema_id);
const char *schema_definition = serdes_schema_definition(serdes_schema);
if (schema_definition) {
printf("serdes_schema: %s\n", schema_definition);
} else {
printf("Failed to retrieve serdes_schema.\n");
}
}
print_schema_info(serdes_schema);
возвращает:
schema_name: production.iot.device.avro-value
schema_id: 17
serdes_schema: {"type":"record","name":"device","fields":[{"name":"status","type":["null","string"],"default":null},{"name":"location","type":["null","string"],"default":null},{"name":"type","type":["null","string"],"default":null},{"name":"temperature","type":["null","long"],"default":null},{"name":"humidity","type":["null","double"],"default":null},{"name":"battery","type":["null","long"],"default":null},{"name":"signal_strength","type":["null","long"],"default":null},{"name":"mode","type":["null","string"],"default":null},{"name":"active","type":["null","boolean"],"default":null}]}
Это означает, что функция serdes_schema_get
работает хорошо.
В моем исходном коде я добавил
void print_avro_schema(avro_schema_t schema) {
char schema_str [8192];
avro_writer_t writer = avro_writer_memory(schema_str, 8192);
avro_schema_to_json(schema, writer);
printf("schema: %s\n", schema_str);
}
print_avro_schema(schema);
распечатаю
{"type":"record","name":"device","fields":[{"name":"status","type":[{"type":"null"},{"type":"string"}]},{"name":"location","type":[{"type":"null"},{"type":"string"}]},{"name":"type","type":[{"type":"null"},{"type":"string"}]},{"name":"temperature","type":[{"type":"null"},{"type":"long"}]},{"name":"humidity","type":[{"type":"null"},{"type":"double"}]},{"name":"battery","type":[{"type":"null"},{"type":"long"}]},{"name":"signal_strength","type":[{"type":"null"},{"type":"long"}]},{"name":"mode","type":[{"type":"null"},{"type":"string"}]},{"name":"active","type":[{"type":"null"},{"type":"boolean"}]}]}
Обратите внимание, что внутри нет "default":null
.
Когда я создаю локальную схему
const char LOCAL_SCHEMA_WITH_OPTIONAL[] = "{\"type\": \"record\", \"name\": \"hongbo_test\", \"fields\":[{ \"name\": \"status\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"location\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"type\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"temperature\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"humidity\", \"type\": [\"null\", \"double\"], \"default\": null},{ \"name\": \"battery\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"signal_strength\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"mode\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"active\", \"type\": [\"null\", \"boolean\"], \"default\": null}]}";
avro_schema_t local_schema;
if (avro_schema_from_json_literal(LOCAL_SCHEMA_WITH_OPTIONAL, &local_schema)) {
fprintf(stderr, "Unable to parse person schema\n");
return 1;
}
print_avro_schema(local_schema);
Он печатает точно так же. Итак, я думаю "default":null
не включена avro_schema_t
ожидается переменная типа (?)
Теперь, чтобы воспроизвести ошибку, код можно упростить до
const char LOCAL_SCHEMA_WITH_OPTIONAL[] = "{\"type\": \"record\", \"name\": \"hongbo_test\", \"fields\":[{ \"name\": \"status\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"location\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"type\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"temperature\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"humidity\", \"type\": [\"null\", \"double\"], \"default\": null},{ \"name\": \"battery\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"signal_strength\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"mode\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"active\", \"type\": [\"null\", \"boolean\"], \"default\": null}]}";
// const char LOCAL_SCHEMA_WITHOUT_OPTIONAL[] = "{\"type\": \"record\", \"name\": \"hongbo_test\", \"fields\":[{ \"name\": \"status\", \"type\": \"string\"},{ \"name\": \"location\", \"type\": \"string\"},{ \"name\": \"type\", \"type\": \"string\"},{ \"name\": \"temperature\", \"type\": \"long\"},{ \"name\": \"humidity\", \"type\": \"double\"},{ \"name\": \"battery\", \"type\": \"long\"},{ \"name\": \"signal_strength\", \"type\": \"long\"},{ \"name\": \"mode\", \"type\": \"string\"},{ \"name\": \"active\", \"type\": \"boolean\"}]}";
avro_schema_t local_schema;
if (avro_schema_from_json_literal(LOCAL_SCHEMA_WITH_OPTIONAL, &local_schema)) {
fprintf(stderr, "Unable to parse person schema\n");
return 1;
}
print_avro_schema(local_schema);
avro_value_iface_t *record_class = avro_generic_class_from_schema(local_schema);
avro_value_t field;
avro_value_t record;
avro_generic_value_new(record_class, &record);
if (avro_value_get_by_name(&record, "status", &field, NULL) == 0) {
if (avro_value_set_string(&field, status) != 0) {
printf("avro_value_set_string failed\n");
}
}
if (avro_value_get_by_name(&record, "location", &field, NULL) == 0) {
if (avro_value_set_string(&field, location) != 0) {
printf("avro_value_set_string failed\n");
}
}
if (avro_value_get_by_name(&record, "type", &field, NULL) == 0) {
if (avro_value_set_string(&field, type) != 0) {
printf("avro_value_set_string failed\n");
}
}
if (avro_value_get_by_name(&record, "temperature", &field, NULL) == 0) {
if (avro_value_set_long(&field, temperature) != 0) {
printf("avro_value_set_long failed\n");
}
}
if (avro_value_get_by_name(&record, "humidity", &field, NULL) == 0) {
if (avro_value_set_double(&field, humidity) != 0) {
printf("avro_value_set_double failed\n");
}
}
if (avro_value_get_by_name(&record, "battery", &field, NULL) == 0) {
if (avro_value_set_long(&field, battery) != 0) {
printf("avro_value_set_long failed\n");
}
}
if (avro_value_get_by_name(&record, "signal_strength", &field, NULL) == 0) {
if (avro_value_set_long(&field, signal_strength) != 0) {
printf("avro_value_set_long failed\n");
}
}
if (avro_value_get_by_name(&record, "mode", &field, NULL) == 0) {
if (avro_value_set_string(&field, mode) != 0) {
printf("avro_value_set_string failed\n");
}
}
if (avro_value_get_by_name(&record, "active", &field, NULL) == 0) {
if (avro_value_set_boolean(&field, active) != 0) {
printf("avro_value_set_boolean failed\n");
}
}
// For debugging
char *record_str;
avro_value_to_json(&record, 1, &record_str);
printf("record: %s\n", record_str);
void *avro_payload = NULL;
size_t avro_size;
// For debugging, this `avro_value_sizeof` is where it failed inside next function `serdes_schema_serialize_avro`
int aerr = avro_value_sizeof(&record, &avro_size);
if (aerr) {
printf("avro_value_sizeof: %s\n", strerror(aerr));
}
Когда я использую LOCAL_SCHEMA_WITHOUT_OPTIONAL
, он напечатает что-то вроде
record: {"status": "online", "location": "locationA", "type": "type1", "temperature": 38, "humidity": 0.76082854986229376, "battery": 40, "signal_strength": 84, "mode": "auto", "active": true}
Когда я использую LOCAL_SCHEMA_WITH_OPTIONAL
, он напечатает что-то вроде
avro_value_set_string failed
avro_value_set_string failed
avro_value_set_string failed
avro_value_set_long failed
avro_value_set_double failed
avro_value_set_long failed
avro_value_set_long failed
avro_value_set_string failed
avro_value_set_boolean failed
record: (null)
avro_value_sizeof: Invalid argument
Если ожидается "default":null
не включенная avro_schema_t
переменная типа, возникает вопрос, как создать правильную record
, когда необязательное значение может быть нулевым.
Привет @OneCricketeer, извини за опечатку, когда я редактировал информацию. Я исправил это в своем вопросе. Функция serdes_schema_get
работает хорошо. Я также добавил результат в вопрос.
Я нашел ошибку: в основном, когда значение может быть необязательным, это тип Union (первый тип в Union — null
, второй тип — string
, long
, double
, boolean
и т. д.).
Мне нужно добавить дополнительный avro_value_set_branch
шаг.
Вот прототип функции avro_value_set_branch
:
int avro_value_set_branch(avro_value_t *union_val, int discriminant, avro_value_t *branch);
По сути, эта часть кода
if (avro_value_get_by_name(&record, "status", &field, NULL) == 0) {
avro_value_set_string(&field, status);
}
if (avro_value_get_by_name(&record, "location", &field, NULL) == 0) {
avro_value_set_string(&field, location);
}
// ...
нужно обновить до чего-то вроде
if (avro_value_get_by_name(&record, "status", &field, NULL) == 0) {
if (avro_value_set_branch(&field, 1, &field) == 0) {
avro_value_set_string(&field, status);
}
}
if (avro_value_get_by_name(&record, "location", &field, NULL) == 0) {
if (avro_value_set_branch(&field, 1, &field) == 0) {
avro_value_set_string(&field, location);
}
}
// ...
Теперь данные можно успешно записать в Kafka:
Вызов Schema_get работает должным образом? Название вашей темы заканчивается на json, но в теме схемы написано avro.