Как сохранить сообщение JSONLines RDD от кафки

я получил 1 сообщение в кафке, состоящее из нескольких независимых строк json. я хочу передать это сообщение в hdfs. проблема в том, что мой код сохраняет только самый первый json и игнорирует остальные.

пример 1 сообщения кафки (не несколько сообщений):

{"field": "1"}
{"field": "2"}
{"field": "3"}

часть кода scala:

 val stream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](
      streamingContext, brokers, new ZooKeeperOffsetsStore(zkQuorum, zkPath), topic)
    stream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {

        val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()

        val df = spark.sqlContext.read.format(rdd.map(m => m._2))

        df.write.mode(SaveMode.Append).format("json").save(outputPath)
      }

    })

конкретное решение лежит в части rdd.map(m => m._2), где мне нужно отобразить все линии, а не только первую. мне кажется, что сам rdd уже вырезан и не содержит остальных строк json.

Как сделать HTTP-запрос в Javascript?
Как сделать HTTP-запрос в Javascript?
В JavaScript вы можете сделать HTTP-запрос, используя объект XMLHttpRequest или более новый API fetch. Вот пример для обоих методов:
0
0
74
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я решил это, работая с текстом вместо json. основная разница заключается в преобразовании toDF():

stream.foreachRDD(rdd => {

      if (!rdd.isEmpty) {        
        //works as .txt file: 
        rdd.map(m => m._2).toDF().coalesce(1).write.mode(SaveMode.Append).format("text").save(outputPath)


      }
    })

Другие вопросы по теме