Я получаю ошибку CDRS.toDF()
error
case class CDR(phone:String, first_type:String,in_out:String,local:String,duration:String,date:String,time:String,roaming:String,amount:String,in_network:String,is_promo:String,toll_free:String,bytes:String,last_type:String)
// Create direct Kafka stream with brokers and topics
//val topicsSet = Set[String] (kafka_topic)
val topicsSet = Set[String] (kafka_topic)
val kafkaParams = Map[String, String]("metadata.broker.list" ->
kafka_broker)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](
ssc, kafkaParams, topicsSet).map(_._2)
//===============================================================================================
//Apply Schema Of Class CDR to Message Coming From Kafka
val CDRS = messages.map(_.split('|')).map(x=> CDR
(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12),x(13).repla ceAll("\n","")))
Это DStream
, а DStreams
не имеет метода toDF
. Ожидается ошибка компиляции. И если вы хотите добавить информацию к своему вопросу, не используйте комментарии. Просто перейдите по ссылке редактировать.
Спасибо. не могли бы вы направить мне ссылку. как я могу преобразовать DStream в RDD и в Data Frame?
Ты не можешь. Вы можете dstream.foreachRDD(rdd => rdd.toDF)
, но похоже, что вы действительно ищете Структурированная потоковая передача
Спасибо. Вы сказали "Верно". Я преобразовал поток D в foreachRDD, а затем преобразовал .toDf ()
case class CDR (phone: String, first_type: String, in_out: String, local: String, duration: String, date: String, time: St ring, roaming: String, amount: String, in_net work: String , is_promo: String, toll_free: St ring, bytes: String, la st_type: String)