У нас есть требование выполнять массовые записи в elasticsearch. Мы хотели бы знать, есть ли лучший способ пакетной обработки данных и предотвращения потери данных при пакетной обработке.
public void consume() {
logger.debug("raw consume......");
String topic = "json.incoming";
String consGroup = "rConsumerGroup";
Properties props = new Properties();
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "20000");
props.put("max.poll.records", "10000");
consumer = new GenericConsumer<String, JsonNode>().initialize(topic, consGroup, STREAMSERDE.STRINGDESER, STREAMSERDE.JSONDESER, props);
logger.debug("Kafka Consumer Initialized......");
buffer = new ArrayList<MessageVO>();
while (true) {
try {
ConsumerRecords<String, JsonNode> records = consumer.poll(100);
Date startTime = Calendar.getInstance()
.getTime();
if (records.count() == 0 && !buffer.isEmpty()) {
lastSeenZeroPollCounter++;
}
if (records.count() > 0) {
logger.debug(">>records count = " + records.count());
for (ConsumerRecord<String, JsonNode> record : records) {
logger.debug("record.offset() = " + record.offset() + " : record.key() = " + record.key());
JsonNode jsonMessage = record.value();
logger.debug("incoming Message = " + jsonMessage);
ObjectMapper objectMapper = new ObjectMapper();
MessageVO rawMessage = objectMapper.convertValue(jsonMessage, MessageVO.class);
logger.info("Size of the buffer is " + buffer.size());
buffer.add(rawMessage);
}
Date endTime = Calendar.getInstance()
.getTime();
long durationInMilliSec = endTime.getTime() - startTime.getTime();
logger.debug("Number of Records:: " + records.count() + " Time took to process poll :: " + durationInMilliSec);
}
if ((buffer.size() >= 1000 && buffer.size() <= 3000) || (buffer.size() > 0 && lastSeenZeroPollCounter >= 3000)) {
lastSeenZeroPollCounter = 0;
List<RawSyslogMessageVO> clonedBuffer = deepCopy(buffer);
logger.info("The size of clonedBuffer is ::: " + clonedBuffer.size());
writerService.writeRaw(clonedBuffer);
buffer.clear();
}
consumer.commitSync();
} catch (Throwable throwable) {
logger.error("Error occured while processing message", throwable);
throwable.printStackTrace();
}
}
}
Код для клонирования данных, чтобы избежать потери данных
private List<MessageVO> deepCopy(List<MessageVO> messages) {
List<MessageVO> listOfMessages = new ArrayList<>();
logger.debug("DeepClone :: listOfMessages size ::: " + listOfMessages.size());
listOfMessages.addAll(messages);
return Collections.unmodifiableList(messages);
}
Любая помощь приветствуется. Спасибо.




Лучший способ, чем писать его самостоятельно, — это использовать Kafka Connect API Apache Kafka — он был создан специально для потоковой интеграции из систем в Kafka, а Kafka — в другие системы :-)
Соединитель эластичного поиска будет передавать данные из темы Kafka в Elasticsearch с настраиваемыми размерами пакетов и т. д., а также семантикой однократной доставки, масштабируемой обработкой и т. д.
Отказ от ответственности: я работаю в Confluent.
Мы справились с тем же вариантом использования, немного упростив дизайн приложения: мы в основном выполняем следующие шаги.
BatchAcknowledgingMessageListener для получения
записи с max.poll.records, установленные в соответствии с требованиямиСледуя этому более простому дизайну, мы понимаем, что большинство массовых коммитов будут иметь желаемое количество записей. В случае, когда в топике Kafka не так много сообщений, как желаемое количество записей для массового индексирования, мы решаем индексировать все, что доступно в одной выборке в любом случае, вместо того, чтобы явно обрабатывать состояние фиксации, управлять буфером и т. д. в приложении. .
Массовая фиксация Elasticsearch — это оптимизация. Я не вижу причин быть предельно точным в отношении общего количества записей на один массовый запрос. (см. также это руководство).
P.S: Нам нужно было написать код вместо использования соединителя или готовых решений, потому что наши входные данные поступают из нескольких тем в разных форматах, таких как protobuf, заархивированный XML, Json и т. д., и нам нужно было выполнить преобразование формата и сложную десериализацию перед индексированием данных.