Возникла ошибка «Ошибка сериализатора» при сериализации в AVRO со схемой, включающей необязательные поля

Я создаю данные для Kafka в формате AVRO.

Я создал схему с помощью

curl --location 'https://confluent-schema-registry.example.com/subjects/production.iot.device.avro-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{
    "schema": "{\"type\": \"record\", \"name\": \"device\", \"fields\":[{ \"name\": \"status\", \"type\": \"string\"},{ \"name\": \"location\", \"type\": \"string\"},{ \"name\": \"type\", \"type\": \"string\"},{ \"name\": \"temperature\", \"type\": \"long\"},{ \"name\": \"humidity\", \"type\": \"double\"},{ \"name\": \"battery\", \"type\": \"long\"},{ \"name\": \"signal_strength\", \"type\": \"long\"},{ \"name\": \"mode\", \"type\": \"string\"},{ \"name\": \"active\", \"type\": \"boolean\"}]}"
}'

я использую

  • авро-с 1.11.3
  • libserdes 8.0.0

Ниже приведен код, который может успешно записать данные в Kafka:

#include <avro.h>
#include <glib.h>
#include <librdkafka/rdkafka.h>
#include <libserdes/serdes-avro.h>
#include <libserdes/serdes.h>
#include <signal.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include "config.c"

#define ARR_SIZE(arr) (sizeof((arr)) / sizeof((arr[0])))

static volatile bool is_running = true;

static void delivery_report(rd_kafka_t *kafka_handle,
                            const rd_kafka_message_t *rkmessage, void *opaque) {
  if (rkmessage->err) {
    g_error("Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
  }
}

void signal_handler(int signal) {
  if (signal == SIGINT || signal == SIGTERM) {
    is_running = false;
  }
}

int main(int argc, char **argv) {
  rd_kafka_t *producer;
  rd_kafka_conf_t *conf;
  serdes_conf_t *serdes_conf;
  serdes_t *serdes;
  char errstr[512];

  if (argc != 2) {
    g_error("Usage: %s <config.ini>", argv[0]);
  }

  const char *config_file = argv[1];

  g_autoptr(GError) error = NULL;
  g_autoptr(GKeyFile) key_file = g_key_file_new();
  if (!g_key_file_load_from_file(key_file, config_file, G_KEY_FILE_NONE, &error)) {
    g_error("Error loading config file: %s", error->message);
  }

  conf = rd_kafka_conf_new();
  load_config_group(conf, key_file, "default");

  rd_kafka_conf_set(conf, "queue.buffering.max.messages", "10000000", NULL, 0);
  rd_kafka_conf_set(conf, "queue.buffering.max.kbytes", "10485760", NULL, 0);
  rd_kafka_conf_set(conf, "batch.size", "65536", NULL, 0);
  rd_kafka_conf_set(conf, "linger.ms", "5", NULL, 0);
  rd_kafka_conf_set_dr_msg_cb(conf, delivery_report);

  producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
  if (!producer) {
    g_error("Failed to create new producer: %s", errstr);
  }

  signal(SIGINT, signal_handler);
  signal(SIGTERM, signal_handler);

  serdes_conf = serdes_conf_new(
      NULL, 0, "schema.registry.url",
      "https://confluent-schema-registry.example.com", NULL);

  serdes = serdes_new(serdes_conf, errstr, sizeof(errstr));
  if (!serdes) {
    g_error("Failed to create serdes instance: %s", errstr);
  }

  const char *topic = "production.iot.device.avro";
  const char *schema_name = "production.iot.device.avro-value";
  serdes_schema_t *serdes_schema =
      serdes_schema_get(serdes, schema_name, -1, errstr, sizeof(errstr));
  if (!serdes_schema) {
    g_error("Failed to retrieve AVRO schema: %s", errstr);
  }

  const char *device_ids[6] = {"device1", "device2", "device3",
                               "device4", "device5", "device6"};
  const char *status_list[3] = {"online", "offline", "maintenance"};
  const char *locations[3] = {"locationA", "locationB", "locationC"};
  const char *types[3] = {"type1", "type2", "type3"};

  srandom(time(NULL));  // Seed the random number generator

  while (is_running) {
    const char *key = device_ids[random() % ARR_SIZE(device_ids)];

    const char *status = status_list[random() % ARR_SIZE(status_list)];
    const char *location = locations[random() % ARR_SIZE(locations)];
    const char *type = types[random() % ARR_SIZE(types)];
    double temperature = ((double)random() / RAND_MAX) * 100.0 - 50.0;
    double humidity = ((double)random() / RAND_MAX);
    int battery = random() % 101;
    int signal_strength = random() % 101;
    const char *mode = (random() % 2) ? "manual" : "auto";
    bool active = (random() % 2);

    avro_schema_t schema = serdes_schema_avro(serdes_schema);
    avro_value_iface_t *record_class = avro_generic_class_from_schema(schema);

    avro_value_t record;
    avro_generic_value_new(record_class, &record);

    avro_value_t field;
    if (avro_value_get_by_name(&record, "status", &field, NULL) == 0) {
      avro_value_set_string(&field, status);
    }
    if (avro_value_get_by_name(&record, "location", &field, NULL) == 0) {
      avro_value_set_string(&field, location);
    }
    if (avro_value_get_by_name(&record, "type", &field, NULL) == 0) {
      avro_value_set_string(&field, type);
    }
    if (avro_value_get_by_name(&record, "temperature", &field, NULL) == 0) {
      avro_value_set_long(&field, temperature);
    }
    if (avro_value_get_by_name(&record, "humidity", &field, NULL) == 0) {
      avro_value_set_double(&field, humidity);
    }
    if (avro_value_get_by_name(&record, "battery", &field, NULL) == 0) {
      avro_value_set_long(&field, battery);
    }
    if (avro_value_get_by_name(&record, "signal_strength", &field, NULL) == 0) {
      avro_value_set_long(&field, signal_strength);
    }
    if (avro_value_get_by_name(&record, "mode", &field, NULL) == 0) {
      avro_value_set_string(&field, mode);
    }
    if (avro_value_get_by_name(&record, "active", &field, NULL) == 0) {
      avro_value_set_boolean(&field, active);
    }

    void *avro_payload = NULL;
    size_t avro_size;
    serdes_err_t serr = serdes_schema_serialize_avro(serdes_schema, &record, &avro_payload, &avro_size, errstr, sizeof(errstr));
    if (serr != SERDES_ERR_OK) {
      g_error("Failed to serialize data: %s", serdes_err2str(serr));
    }

    rd_kafka_resp_err_t err;
    err = rd_kafka_producev(producer, RD_KAFKA_V_TOPIC(topic),
                            RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                            RD_KAFKA_V_KEY((void *)key, strlen(key)),
                            RD_KAFKA_V_VALUE(avro_payload, avro_size),
                            RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END);

    if (err) {
      g_error("Failed to produce to topic %s: %s", topic,
              rd_kafka_err2str(err));
    }

    free(avro_payload);
    avro_value_decref(&record);
    rd_kafka_poll(producer, 0);
    g_usleep(50);  // μs
  }

  g_message("Flushing final messages ...");
  rd_kafka_flush(producer, 10 * 1000);

  if (rd_kafka_outq_len(producer) > 0) {
    g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));
  }

  g_message("Producer stopped.");
  rd_kafka_destroy(producer);
  serdes_schema_destroy(serdes_schema);
  serdes_destroy(serdes);
  return 0;
}

Теперь, для полной совместимости схемы, я постарался сделать каждое поле необязательным. Я удалил старую схему и воссоздал новую с помощью

curl --location 'https://confluent-schema-registry.example.com/subjects/production.iot.device.avro-value/versions' \
--header 'Content-Type: application/vnd.schemaregistry.v1+json' \
--data '{
    "schema": "{\"type\": \"record\", \"name\": \"device\", \"fields\":[{ \"name\": \"status\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"location\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"type\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"temperature\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"humidity\", \"type\": [\"null\", \"double\"], \"default\": null},{ \"name\": \"battery\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"signal_strength\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"mode\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"active\", \"type\": [\"null\", \"boolean\"], \"default\": null}]}"
}'

Но теперь тот же код даст сбой в строке serdes_err_t serr = serdes_schema_serialize_avro(schema, &record, &avro_payload, &avro_size, errstr, sizeof(errstr)); и вернет ошибку:

Не удалось сериализовать данные: сбой сериализатора.

Согласно моим исследованиям, «Сбой сериализатора» соответствует SERDES_ERR_SERIALIZER , и эта часть в библиотеке libserdes вызвала эту ошибку. Функция avro_value_sizeof внутри не удалась, и вот avro_value_sizeof исходный код.

На основе этой информации я добавил этот раздел для воспроизведения:

int aerr = avro_value_sizeof(&record, &avro_size);
if (aerr) {
  printf("%s\n", strerror(aerr));
}

Я вижу, что это распечатывается

Неверный аргумент

Однако я не уверен, что вызвало эту проблему. Любой гид будет признателен, спасибо!


ОБНОВЛЕНИЕ 1

В моем исходном коде я добавил

void print_serdes_schema(serdes_schema_t *serdes_schema) {
    if (!serdes_schema) {
        printf("serdes_schema is NULL.\n");
        return;
    }

    const char *schema_name = serdes_schema_name(serdes_schema);
    if (schema_name) {
        printf("schema_name: %s\n", schema_name);
    } else {
        printf("Failed to retrieve schema_name.\n");
    }

    int schema_id = serdes_schema_id(serdes_schema);
    printf("schema_id: %d\n", schema_id);

    const char *schema_definition = serdes_schema_definition(serdes_schema);
    if (schema_definition) {
        printf("serdes_schema: %s\n", schema_definition);
    } else {
        printf("Failed to retrieve serdes_schema.\n");
    }
}

print_schema_info(serdes_schema); возвращает:

schema_name: production.iot.device.avro-value
schema_id: 17
serdes_schema: {"type":"record","name":"device","fields":[{"name":"status","type":["null","string"],"default":null},{"name":"location","type":["null","string"],"default":null},{"name":"type","type":["null","string"],"default":null},{"name":"temperature","type":["null","long"],"default":null},{"name":"humidity","type":["null","double"],"default":null},{"name":"battery","type":["null","long"],"default":null},{"name":"signal_strength","type":["null","long"],"default":null},{"name":"mode","type":["null","string"],"default":null},{"name":"active","type":["null","boolean"],"default":null}]}

Это означает, что функция serdes_schema_get работает хорошо.


ОБНОВЛЕНИЕ 2

В моем исходном коде я добавил

void print_avro_schema(avro_schema_t schema) {
  char schema_str [8192];
  avro_writer_t writer = avro_writer_memory(schema_str, 8192);
  avro_schema_to_json(schema, writer);
  printf("schema: %s\n", schema_str);
}

print_avro_schema(schema); распечатаю

{"type":"record","name":"device","fields":[{"name":"status","type":[{"type":"null"},{"type":"string"}]},{"name":"location","type":[{"type":"null"},{"type":"string"}]},{"name":"type","type":[{"type":"null"},{"type":"string"}]},{"name":"temperature","type":[{"type":"null"},{"type":"long"}]},{"name":"humidity","type":[{"type":"null"},{"type":"double"}]},{"name":"battery","type":[{"type":"null"},{"type":"long"}]},{"name":"signal_strength","type":[{"type":"null"},{"type":"long"}]},{"name":"mode","type":[{"type":"null"},{"type":"string"}]},{"name":"active","type":[{"type":"null"},{"type":"boolean"}]}]}

Обратите внимание, что внутри нет "default":null.

Когда я создаю локальную схему

const char LOCAL_SCHEMA_WITH_OPTIONAL[] = "{\"type\": \"record\", \"name\": \"hongbo_test\", \"fields\":[{ \"name\": \"status\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"location\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"type\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"temperature\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"humidity\", \"type\": [\"null\", \"double\"], \"default\": null},{ \"name\": \"battery\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"signal_strength\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"mode\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"active\", \"type\": [\"null\", \"boolean\"], \"default\": null}]}";
avro_schema_t local_schema;
if (avro_schema_from_json_literal(LOCAL_SCHEMA_WITH_OPTIONAL, &local_schema)) {
  fprintf(stderr, "Unable to parse person schema\n");
  return 1;
}
print_avro_schema(local_schema);

Он печатает точно так же. Итак, я думаю "default":null не включена avro_schema_t ожидается переменная типа (?)

Теперь, чтобы воспроизвести ошибку, код можно упростить до

const char LOCAL_SCHEMA_WITH_OPTIONAL[] = "{\"type\": \"record\", \"name\": \"hongbo_test\", \"fields\":[{ \"name\": \"status\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"location\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"type\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"temperature\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"humidity\", \"type\": [\"null\", \"double\"], \"default\": null},{ \"name\": \"battery\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"signal_strength\", \"type\": [\"null\", \"long\"], \"default\": null},{ \"name\": \"mode\", \"type\": [\"null\", \"string\"], \"default\": null},{ \"name\": \"active\", \"type\": [\"null\", \"boolean\"], \"default\": null}]}";
// const char LOCAL_SCHEMA_WITHOUT_OPTIONAL[] = "{\"type\": \"record\", \"name\": \"hongbo_test\", \"fields\":[{ \"name\": \"status\", \"type\": \"string\"},{ \"name\": \"location\", \"type\": \"string\"},{ \"name\": \"type\", \"type\": \"string\"},{ \"name\": \"temperature\", \"type\": \"long\"},{ \"name\": \"humidity\", \"type\": \"double\"},{ \"name\": \"battery\", \"type\": \"long\"},{ \"name\": \"signal_strength\", \"type\": \"long\"},{ \"name\": \"mode\", \"type\": \"string\"},{ \"name\": \"active\", \"type\": \"boolean\"}]}";
avro_schema_t local_schema;
if (avro_schema_from_json_literal(LOCAL_SCHEMA_WITH_OPTIONAL, &local_schema)) {
  fprintf(stderr, "Unable to parse person schema\n");
  return 1;
}
print_avro_schema(local_schema);

avro_value_iface_t *record_class = avro_generic_class_from_schema(local_schema);

avro_value_t field;
avro_value_t record;
avro_generic_value_new(record_class, &record);

if (avro_value_get_by_name(&record, "status", &field, NULL) == 0) {
  if (avro_value_set_string(&field, status) != 0) {
    printf("avro_value_set_string failed\n");
  }
}
if (avro_value_get_by_name(&record, "location", &field, NULL) == 0) {
  if (avro_value_set_string(&field, location) != 0) {
    printf("avro_value_set_string failed\n");
  }
}
if (avro_value_get_by_name(&record, "type", &field, NULL) == 0) {
  if (avro_value_set_string(&field, type) != 0) {
    printf("avro_value_set_string failed\n");
  }
}
if (avro_value_get_by_name(&record, "temperature", &field, NULL) == 0) {
  if (avro_value_set_long(&field, temperature) != 0) {
    printf("avro_value_set_long failed\n");
  }
}
if (avro_value_get_by_name(&record, "humidity", &field, NULL) == 0) {
  if (avro_value_set_double(&field, humidity) != 0) {
    printf("avro_value_set_double failed\n");
  }
}
if (avro_value_get_by_name(&record, "battery", &field, NULL) == 0) {
  if (avro_value_set_long(&field, battery) != 0) {
    printf("avro_value_set_long failed\n");
  }
}
if (avro_value_get_by_name(&record, "signal_strength", &field, NULL) == 0) {
  if (avro_value_set_long(&field, signal_strength) != 0) {
    printf("avro_value_set_long failed\n");
  }
}
if (avro_value_get_by_name(&record, "mode", &field, NULL) == 0) {
  if (avro_value_set_string(&field, mode) != 0) {
    printf("avro_value_set_string failed\n");
  }
}
if (avro_value_get_by_name(&record, "active", &field, NULL) == 0) {
  if (avro_value_set_boolean(&field, active) != 0) {
    printf("avro_value_set_boolean failed\n");
  }
}

// For debugging
char *record_str;
avro_value_to_json(&record, 1, &record_str);
printf("record: %s\n", record_str);

void *avro_payload = NULL;
size_t avro_size;

// For debugging, this `avro_value_sizeof` is where it failed inside next function `serdes_schema_serialize_avro`
int aerr = avro_value_sizeof(&record, &avro_size);
if (aerr) {
  printf("avro_value_sizeof: %s\n", strerror(aerr));
}
  • Когда я использую LOCAL_SCHEMA_WITHOUT_OPTIONAL, он напечатает что-то вроде

    record: {"status": "online", "location": "locationA", "type": "type1", "temperature": 38, "humidity": 0.76082854986229376, "battery": 40, "signal_strength": 84, "mode": "auto", "active": true}
    
  • Когда я использую LOCAL_SCHEMA_WITH_OPTIONAL, он напечатает что-то вроде

    avro_value_set_string failed
    avro_value_set_string failed
    avro_value_set_string failed
    avro_value_set_long failed
    avro_value_set_double failed
    avro_value_set_long failed
    avro_value_set_long failed
    avro_value_set_string failed
    avro_value_set_boolean failed
    record: (null)
    avro_value_sizeof: Invalid argument
    

Если ожидается "default":null не включенная avro_schema_t переменная типа, возникает вопрос, как создать правильную record, когда необязательное значение может быть нулевым.

Вызов Schema_get работает должным образом? Название вашей темы заканчивается на json, но в теме схемы написано avro.

OneCricketeer 11.08.2024 05:25

Привет @OneCricketeer, извини за опечатку, когда я редактировал информацию. Я исправил это в своем вопросе. Функция serdes_schema_get работает хорошо. Я также добавил результат в вопрос.

Hongbo Miao 11.08.2024 06:25
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
2
72
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я нашел ошибку: в основном, когда значение может быть необязательным, это тип Union (первый тип в Union — null, второй тип — string, long, double, boolean и т. д.).

Мне нужно добавить дополнительный avro_value_set_branch шаг.

Вот прототип функции avro_value_set_branch:

int avro_value_set_branch(avro_value_t *union_val, int discriminant, avro_value_t *branch);

По сути, эта часть кода

if (avro_value_get_by_name(&record, "status", &field, NULL) == 0) {
  avro_value_set_string(&field, status);
}
if (avro_value_get_by_name(&record, "location", &field, NULL) == 0) {
  avro_value_set_string(&field, location);
}
// ...

нужно обновить до чего-то вроде

if (avro_value_get_by_name(&record, "status", &field, NULL) == 0) {
  if (avro_value_set_branch(&field, 1, &field) == 0) {
    avro_value_set_string(&field, status);
  }
}
if (avro_value_get_by_name(&record, "location", &field, NULL) == 0) {
  if (avro_value_set_branch(&field, 1, &field) == 0) {
    avro_value_set_string(&field, location);
  }
}
// ...

Теперь данные можно успешно записать в Kafka:

Другие вопросы по теме