Я получаю данные JSON от брокеров Kafka и читаю их с помощью Spark Streaming и Scala. Ниже приведены данные примера:
{"timestamp":"2020-12-11 22:35:00.000000 UTC","tech":"Spark","version":2,"start_time":1607725688402210,"end_time":1607726131636059}
Я получаю эти данные как RDD[String] в своем коде Scala, теперь я хочу прочитать определенный ключ из каждой строки данных, например, «версию» из приведенных выше данных. Я могу сделать это следующим образом:
for(record <- rdd){
val jsonRecord = JSON.parseFull(record );
val globalMap = jsonRecord.get.asInstanceOf[Map[String, Any]]
val version = globalMap.get("version").get.asInstanceOf[String]
}
Но я не уверен, что это лучший способ чтения RDD с данными JSON. Пожалуйста, предложите.
Спасибо,
Я использую прямой поток для получения необработанного потока, а затем зацикливаюсь на нем.
Версия спарка 2.3.0.2.
почему вы используете dstream, он устарел?
У меня есть работающий код в производстве, и я не могу вносить в него большие изменения. На данный момент мне нужно добиться некоторого варианта использования с существующей кодовой базой. Спасибо,
каков ваш ожидаемый результат?
Мне нужно получить некоторые конкретные ключи из сообщений и выполнить некоторые вычисления для них. Окончательный расчет для перехода в плоский файл.
мне кажется, что вы используете scala.util.parsing.json.JSON, есть ли у вас другие библиотеки Json, которые вы можете использовать?
Используйте библиотеку json4s
для анализа данных json, и она будет доступна со искрой по умолчанию, нет необходимости импортировать дополнительные библиотеки.
Проверьте код ниже.
scala> rdd.collect.foreach(println)
{"timestamp":"2020-12-11 22:35:00.000000 UTC","tech":"Spark","version":2,"start_time":1607725688402210,"end_time":1607726131636059}
scala> :paste
// Entering paste mode (ctrl-D to finish)
rdd.map{ row =>
// Import required libraries for json parsers.
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val formats = DefaultFormats
// parse json message using parse function from json4s lib.
val jsonData = parse(row)
// extract required fields from parsed json data.
// extracting version field value
val version = (jsonData \\ "version").extract[Int]
// extracting timestamp field value
val timestamp = (jsonData \\ "timestamp").extract[String]
(version,timestamp)
}
.collect
.foreach(println)
// Exiting paste mode, now interpreting.
(2,2020-12-11 22:35:00.000000 UTC)
Спасибо, Шринивас. Это хорошо подходит для моего варианта использования.
Вы можете использовать искровую структурированную потоковую передачу? или не хотите?