Стратегия восстановления kafka producer при использовании паркетного файла

У меня есть вариант использования, когда мне нужно читать файлы паркета и публиковать записи в теме кафки.

Я читал паркетные файлы, используя:

spark.read.schema(//path to parquet file )

Затем я сортирую этот фрейм данных на основе отметки времени (требования к конкретному варианту использования для сохранения порядка) Наконец, я делаю следующее:

binaryFiles ниже - это фрейм данных, содержащий отсортированные записи из паркетного файла.

binaryFiles.coalesce(1).foreachPartition(partition => {
    val producer = new KafkaProducer[String,Array[Byte]](properties)
    partition.foreach(file => {
      try {  
         var producerRecord = new ProducerRecord[String,Array[Byte]](targetTopic,file.getAs[Integer](2),file.getAs[String](0),file.getAs[Array[Byte]](1))
          var metadata = producer.send(producerRecord, new Callback {
                                  override def onCompletion(recordMetadata:RecordMetadata , e:Exception):Unit = {
                                      if (e != null){
                                          println ("Error while producing" +  e);
                                          producer.close()
                                      }
                                  }
                              }); 
              producer.flush()          
        }
      catch{
          case  unknown :Throwable =>  println("Exception obtained with record. Key :  " + unknown)
          }      
    })
  producer.close() 
  println("Closing the producer for this partition")
  })

При написании стратегии аварийного переключения для этого сценария я пытаюсь удовлетворить один из сценариев: что произойдет, если узел, на котором запущен производитель kafka, выйдет из строя.

Теперь, когда производитель kafka перезапускается, он снова будет читать файл parquet с самого начала и снова начнет публиковать все записи в той же теме.

Как мы можем преодолеть это и реализовать своего рода контрольную точку, которую предоставляет искровый поток.

PS: я не могу использовать структурированную потоковую передачу искр, так как это не сохраняет порядок сообщений

Вам нужно будет сохранить хотя бы имя файла и количество записей с самого начала, которые были отправлены и подтверждены. Где и как вы храните эту информацию, зависит от вас. Zookeeper - один из вариантов. Или вы можете добавить поле, например временную метку в данных, и тогда вы сможете обойти проблемы с упорядочением.

OneCricketeer 31.10.2018 14:27

@ cricket_007: в данных уже есть столбец с отметкой времени. Я попробовал структурированную потоковую передачу искры, чтобы увидеть, может ли это быть вариантом. При внимательном рассмотрении порядок внутри раздела сохраняется, но между разделами это спорно.

Sumit Baurai 31.10.2018 16:09

извините .. отправлено раньше по ошибке .. .option("checkpointLocation",checkPointLocation) - пытается использовать контрольную точку из потока kafka (пакет), однако он снова отправляет весь набор записей из файла parquet. Не знаю, как помогает эта контрольная точка.

Sumit Baurai 31.10.2018 16:09
val topic = "SSOrder2" val checkPointLocation = "adl://<<azure_data_lake_Store>>/CheckPoints/OrderTest" + topic binaryFiles.coalesce(1).write .format("kafka") .option("kafka.bootstrap.servers",servers") .option("topic",topic) .option("kafka.sasl.mechanism", "SCRAM-SHA-512") .option("kafka.security.protocol", "SASL_PLAINTEXT") .option("kafka.max.in.flight.requests.per.connection", "1") .option("checkpointLocation",checkPointLocation) .save()
Sumit Baurai 31.10.2018 16:12
2
4
230
0

Другие вопросы по теме