Чтение JSON RDD с использованием Spark Scala

Я получаю данные JSON от брокеров Kafka и читаю их с помощью Spark Streaming и Scala. Ниже приведены данные примера:

{"timestamp":"2020-12-11 22:35:00.000000 UTC","tech":"Spark","version":2,"start_time":1607725688402210,"end_time":1607726131636059}

Я получаю эти данные как RDD[String] в своем коде Scala, теперь я хочу прочитать определенный ключ из каждой строки данных, например, «версию» из приведенных выше данных. Я могу сделать это следующим образом:

for(record <- rdd){
  val jsonRecord = JSON.parseFull(record );
  val globalMap = jsonRecord.get.asInstanceOf[Map[String, Any]]
  val version = globalMap.get("version").get.asInstanceOf[String]
}

Но я не уверен, что это лучший способ чтения RDD с данными JSON. Пожалуйста, предложите.

Спасибо,

Вы можете использовать искровую структурированную потоковую передачу? или не хотите?

d-xa 12.12.2020 00:46

Я использую прямой поток для получения необработанного потока, а затем зацикливаюсь на нем.

sgmbd 12.12.2020 01:01

Версия спарка 2.3.0.2.

sgmbd 12.12.2020 01:05

почему вы используете dstream, он устарел?

Srinivas 12.12.2020 01:08

У меня есть работающий код в производстве, и я не могу вносить в него большие изменения. На данный момент мне нужно добиться некоторого варианта использования с существующей кодовой базой. Спасибо,

sgmbd 12.12.2020 01:11

каков ваш ожидаемый результат?

Srinivas 12.12.2020 01:25

Мне нужно получить некоторые конкретные ключи из сообщений и выполнить некоторые вычисления для них. Окончательный расчет для перехода в плоский файл.

sgmbd 12.12.2020 01:28

мне кажется, что вы используете scala.util.parsing.json.JSON, есть ли у вас другие библиотеки Json, которые вы можете использовать?

d-xa 12.12.2020 13:45
Как сделать HTTP-запрос в Javascript?
Как сделать HTTP-запрос в Javascript?
В JavaScript вы можете сделать HTTP-запрос, используя объект XMLHttpRequest или более новый API fetch. Вот пример для обоих методов:
0
8
250
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Используйте библиотеку json4s для анализа данных json, и она будет доступна со искрой по умолчанию, нет необходимости импортировать дополнительные библиотеки.

Проверьте код ниже.

scala> rdd.collect.foreach(println)

{"timestamp":"2020-12-11 22:35:00.000000 UTC","tech":"Spark","version":2,"start_time":1607725688402210,"end_time":1607726131636059}

scala> :paste
// Entering paste mode (ctrl-D to finish)

rdd.map{ row =>

    // Import required libraries for json parsers.
    import org.json4s._
    import org.json4s.jackson.JsonMethods._
    implicit val formats = DefaultFormats

    // parse json message using parse function from json4s lib.

    val jsonData = parse(row)

    // extract required fields from parsed json data.

    // extracting version field value
    val version = (jsonData \\ "version").extract[Int] 

    // extracting timestamp field value
    val timestamp = (jsonData \\ "timestamp").extract[String] 

    (version,timestamp)
}
.collect
.foreach(println)


// Exiting paste mode, now interpreting.

(2,2020-12-11 22:35:00.000000 UTC)

Спасибо, Шринивас. Это хорошо подходит для моего варианта использования.

sgmbd 14.12.2020 05:55

Другие вопросы по теме