Создание пакетов из записей опроса в kafka

У нас есть требование выполнять массовые записи в elasticsearch. Мы хотели бы знать, есть ли лучший способ пакетной обработки данных и предотвращения потери данных при пакетной обработке.

 public void consume() {
        logger.debug("raw consume......");

        String topic = "json.incoming";
        String consGroup = "rConsumerGroup";

        Properties props = new Properties();
        props.put("enable.auto.commit", "false");
        props.put("session.timeout.ms", "20000");
        props.put("max.poll.records", "10000");

        consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER, props);
        logger.debug("Kafka Consumer Initialized......");
        buffer = new ArrayList<MessageVO>();

        while (true) {
            try {
                ConsumerRecords<String, JsonNode> records = consumer.poll(100);
                Date startTime = Calendar.getInstance()
                    .getTime();
                if (records.count() == 0 && !buffer.isEmpty()) {
                    lastSeenZeroPollCounter++;
                }
                if (records.count() > 0) {
                    logger.debug(">>records count = " + records.count());
                    for (ConsumerRecord<String, JsonNode> record : records) {
                        logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());
                        JsonNode jsonMessage = record.value();
                        logger.debug("incoming Message = " + jsonMessage);
                        ObjectMapper objectMapper = new ObjectMapper();
                        MessageVO rawMessage = objectMapper.convertValue(jsonMessage, MessageVO.class);
                        logger.info("Size of the buffer is " + buffer.size());
                        buffer.add(rawMessage);
                    }
                    Date endTime = Calendar.getInstance()
                        .getTime();
                    long durationInMilliSec = endTime.getTime() - startTime.getTime();
                    logger.debug("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);
                }
                if ((buffer.size() >= 1000 && buffer.size() <= 3000) || (buffer.size() > 0 && lastSeenZeroPollCounter >= 3000)) {
                    lastSeenZeroPollCounter = 0;
                    List<RawSyslogMessageVO> clonedBuffer = deepCopy(buffer);
                    logger.info("The size of clonedBuffer is ::: " + clonedBuffer.size());
                    writerService.writeRaw(clonedBuffer);
                    buffer.clear();
                }

                consumer.commitSync();
            } catch (Throwable throwable) {
                logger.error("Error occured while processing message", throwable);
                throwable.printStackTrace();
            }
        }
    }

Код для клонирования данных, чтобы избежать потери данных

 private List<MessageVO> deepCopy(List<MessageVO> messages) {
        List<MessageVO> listOfMessages = new ArrayList<>();
        logger.debug("DeepClone :: listOfMessages size ::: " + listOfMessages.size());
        listOfMessages.addAll(messages);
        return Collections.unmodifiableList(messages);
    }

Любая помощь приветствуется. Спасибо.

Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
509
2

Ответы 2

Лучший способ, чем писать его самостоятельно, — это использовать Kafka Connect API Apache Kafka — он был создан специально для потоковой интеграции из систем в Kafka, а Kafka — в другие системы :-)

Соединитель эластичного поиска будет передавать данные из темы Kafka в Elasticsearch с настраиваемыми размерами пакетов и т. д., а также семантикой однократной доставки, масштабируемой обработкой и т. д.

Отказ от ответственности: я работаю в Confluent.

Мы справились с тем же вариантом использования, немного упростив дизайн приложения: мы в основном выполняем следующие шаги.

  1. Используйте Spring Kafka BatchAcknowledgingMessageListener для получения записи с max.poll.records, установленные в соответствии с требованиями
  2. Для каждой выборки фиксируйте сообщения с помощью Elasticsearch BulkRequest API.
  3. После успешного массового индексирования подтвердите Kafka.
  4. В случае сбоя повторите попытку или обработайте ошибку

Следуя этому более простому дизайну, мы понимаем, что большинство массовых коммитов будут иметь желаемое количество записей. В случае, когда в топике Kafka не так много сообщений, как желаемое количество записей для массового индексирования, мы решаем индексировать все, что доступно в одной выборке в любом случае, вместо того, чтобы явно обрабатывать состояние фиксации, управлять буфером и т. д. в приложении. .

Массовая фиксация Elasticsearch — это оптимизация. Я не вижу причин быть предельно точным в отношении общего количества записей на один массовый запрос. (см. также это руководство).

P.S: Нам нужно было написать код вместо использования соединителя или готовых решений, потому что наши входные данные поступают из нескольких тем в разных форматах, таких как protobuf, заархивированный XML, Json и т. д., и нам нужно было выполнить преобразование формата и сложную десериализацию перед индексированием данных.

Другие вопросы по теме