Pyspark: rdd и операция "где"

Я изучаю, как обрабатывать Spark RDD с помощью Python, и я не нахожу решения в соответствии с rdd.filter() с условием where.

У меня есть файл CSV, который выглядит так:

id,firstname,city,age,job,salary,childen,awards
1, Yves, OLS-ET-RINHODES, 55, Pilote de chasse, 3395, 3, 3
2, Paul, MARTOT, 32, Pilote d'helicoptere, 2222, 4, 5
3, Steve, DIEULEFIT, 53, Navigateur aerien, 2152, 3, 2
4, Valentin, FEUILLADE, 27, Pilote de chasse, 1776, 0, 2
...

А это мой скрипт на Python:

#!/usr/bin/python
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession

#Context properties
conf = SparkConf().setAppName("Aeroport")
sc = SparkContext(conf=conf)

#Data Reading
data = sc.textFile("hdfs://master:9000/testfile.csv")

#Split each column
dataset = data.map(lambda l: l.split(','))

#Search children number by city
nbChildByCity = dataset.map(lambda row : (row[2],1)).reduceByKey(lambda a,b:a+b)

print "Nombre enfant par ville naissance : " + str(nbChildByCity.collect())

#Search children number by city with father > 50 years old
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in nbChildByCity)
#nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in row[1]) 

print "Nombre enfant par ville naissance avec père > 50 ans : " + str(nbChildByCityFather.collect()) 

Моя проблема: # Искать количество детей по городам с отцом старше 50 лет

Не побоюсь добавить последнее условие: father > 50 years old. Как мне записать условие where в RDD?

Я пробовал это:

nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in nbChildByCity)
nbChildByCityFather = dataset.filter(lambda row : row[3] > 50 in row[1]) 

Но результата нет ..

nbChildByCityFather = nbChildByCity.filter (лямбда-строка: строка [3]> 50)

pissall 11.04.2018 12:34
0
1
63
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Это проще и эффективнее реализовать с помощью API фрейма данных (см. Альтернативный подход внизу).

Чтобы получить количество записей, в которых возраст в строке превышает 50, сначала необходимо выполнить фильтрацию. Вам также необходимо использовать столбец возраста (индекс 6) в вашем вызове reduce:

Количество детей по городам:

nbChildByCity = data.map(lambda row : (row[2], int(row[6].strip()))) 
                     #note that it's using child count, not 1

nbChildByCity.collect()

Выходы:

[(' OLS-ET-RINHODES', 3), (' MARTOT', 4), (' DIEULEFIT', 3), (' FEUILLADE', 0)]

То же, но с wi:

nbChildByCity50 = rdd.filter(lambda l: int(l[3]) > 50 )\
                     .map(lambda row : (row[2], int(row[6].strip()) ))\
                     .reduceByKey(lambda a,b:a+b)
print("Nombre enfant par ville naissance :" + str(nbChildByCity50.collect()))

Выходы:

Nombre enfant par ville naissance :[(' OLS-ET-RINHODES', 3), (' DIEULEFIT', 3)]



Обратите внимание, что это проще и целесообразнее сделать с помощью API фрейма данных:

df = spark.read.csv('cities.csv', header=True, inferSchema=True)
grp = df.groupBy(['city'])
grp.sum('childen').show()

Который дает:

+----------------+------------+
|            city|sum(childen)|
+----------------+------------+
|       FEUILLADE|         0.0|
|          MARTOT|         4.0|
|       DIEULEFIT|         3.0|
| OLS-ET-RINHODES|         3.0|
+----------------+------------+

И с фильтром по возрасту:

grp = df.where('age > 50').groupBy(['city'])
grp.sum('childen').show()

Какие выходы:

+----------------+------------+
|            city|sum(childen)|
+----------------+------------+
|       DIEULEFIT|         3.0|
| OLS-ET-RINHODES|         3.0|
+----------------+------------+

Если я хочу использовать Spark для подобных вещей, что лучше: RDD или Dataframe? Оба используют узлы кластера? Есть более быстрый подход?

Essex 11.04.2018 14:46

Фреймы данных - это API более высокого уровня, в котором используются RDD. Все зависит от функциональности, которую вам нужно достичь, но отдайте предпочтение фреймам данных. Также приятно отметить, что фреймы данных и RDD совместимы ... Проверьте страницу документации SQL / RDD, она подробно объяснена!

ernest_k 11.04.2018 14:51

Да, я знал о двойственности между RDD / Dataframes;) И оба использовали узлы кластера для выполнения заданий, верно?

Essex 11.04.2018 14:53

О да! Оба используют распределенный вычислительный движок Spark, как обычно, работающий на узлах кластера.

ernest_k 11.04.2018 14:54

Ок, прекрасно ! Большое спасибо ! Это непросто, когда кто-то начинает со Spark и распределенной среды, но я хорошо понимаю, что фреймы данных и синтаксис проще;)

Essex 11.04.2018 14:55

Вам стоит filter первыйперед применением reduceByKey

nbChildByCityFather = dataset.filter(lambda row : int(row[3].strip()) > 50).map(lambda row : (row[2],1)).reduceByKey(lambda a,b:a+b)
print "Nombre enfant par ville naissance avec pere > 50 ans : " + str(nbChildByCityFather.collect())

Примечание: этот метод работает только в том случае, если вы удалите строку заголовка из файла csv или как-то отфильтруете ее.

Другие вопросы по теме