Как получить значения rdd, существующие в массиве?

У меня Rdd[(Int, Double)] и array[Int], и я хочу получить новый Rdd[(Int, Double)] только с теми Int, которые тоже существуют в массиве.

Например, если мой array - это [0, 1, 2], а мой rdd - это (1, 4.2), (5, 4.3), я хочу получить в качестве вывода rdd только (1, 4.2).

Я думаю об использовании filter с функцией, которая выполняет итерацию по массиву, выполняет сравнение и возвращает true/false, но я не уверен, что это логика spark.

Что-то вроде:

val newrdd = rdd.filter(x => f(x._1, array)) 

куда

f(x:Int, y:Array[In]): Boolean ={
   val z = false 
   for (a<-0 to y.length-1){
         if (x == y(a)){
            z = true
            z}
       z

}

0
0
485
3

Ответы 3

// Ввод rdd

val rdd = sc.parallelize(Seq((1,4.2),(5,4.3)))

// массив, конвертируем в rdd

val arrRdd = sc.parallelize(Array(0,1,2))

// конвертируем rdd и arrRdd в dataframe

val arrDF = arrRdd.toDF()
val df = rdd.toDF()

// присоединяемся и снова конвертируем в rdd

df.join(arrDF,df.col("_1") === arrDF.col("value"),"leftsemi").rdd.collect

// выводим массив ([1,4.2])

val acceptableValues = array.toSet
rdd.filter { case (x, _) => acceptableValues(x) }

Это кажется простым, но у меня есть Type mismatch, expected: (Int, Double) => Boolean, actual: Boolean.

Tmpoul 13.09.2018 20:13

Вероятно, это связано с версией Scala. Я изменил ответ, чтобы от компилятора требовалось меньше выводов.

Hosam Aly 14.09.2018 10:36

Хотя этот фрагмент кода может решить проблему, он не объясняет, почему и как он отвечает на вопрос. Пожалуйста, включите объяснение вашего кода, так как это действительно помогает улучшить качество вашего сообщения. Помните, что вы отвечаете на вопрос читателей в будущем, и эти люди могут не знать причины вашего предложения кода.

Luca Kiebel 14.09.2018 13:01

Попробуй это:

rdd.filter(x => Array(0,1,2).contains(x._1)).collect.foreach(println)

Выход:

(1,4.2) 

Другие вопросы по теме