Создайте общую функцию для объединения нескольких наборов данных в pyspark

Привет, я создаю общую функцию или класс для добавления n наборов данных, но я не могу найти правильную логику для этого, я помещаю все коды ниже и выделяю раздел, в котором мне нужна помощь. если вы обнаружите какие-либо проблемы с пониманием моего кода, пожалуйста, отправьте мне пинг.

import pyspark
  
# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession
  
# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()

data_fact = [["1", "sravan", "company 1","100"],
        ["2", "ojaswi", "company 1","200"], 
        ["3", "rohith", "company 2","300"],
        ["4", "sridevi", "company 1","400"], 
        ["5", "bobby", "company 1","500"]]
  
# specify column names
columns = ['ID', 'NAME', 'Company','Amount']
  
# creating a dataframe from the lists of data
df_fact = spark.createDataFrame(data_fact, columns)

Department_table = [["1", "45000", "IT"],
         ["2", "145000", "Manager"],
         ["6", "45000", "HR"],
         ["5", "34000", "Sales"]]
  
# specify column names
columns1 = ['ID', 'salary', 'department']
df_Department = spark.createDataFrame(Department_table, columns1)

Leave_Table = [["1", "Sick Leave"],
         ["2", "Casual leave"],
         ["3", "Casual leave"],
         ["4", "Earned Leave"],
         ["4", "Sick Leave"] ]
  
# specify column names
columns2 = ['ID', 'Leave_type']
df_Leave = spark.createDataFrame(Leave_Table, columns2)

Phone_Table = [["1", "Apple"],
         ["2", "Samsung"],
         ["3", "MI"],
         ["4", "Vivo"],
         ["4", "Apple"] ]
  
# specify column names
columns3 = ['ID', 'Phone_type']
 
df_Phone = spark.createDataFrame(Phone_Table, columns3)





 Df_join = df_fact.join(df_Department,df_fact.ID ==df_Department.ID,"inner")\
 .join(df_Phone,df_fact.ID ==df_Phone.ID,"inner")\
 .join(df_Leave,df_fact.ID ==df_Leave.ID,"inner")\
.select(df_fact.Amount,df_Department.ID,df_Department.salary,df_Department.department,df_Phone.Phone_type,df_Leave.Leave_type)

display(Df_join)


в основном, я хочу обобщить этот материал для n наборов данных

 Df_join = df_fact.join(df_Department,df_fact.ID ==df_Department.ID,"inner")\
 .join(df_Phone,df_fact.ID ==df_Phone.ID,"inner")\
 .join(df_Leave,df_fact.ID ==df_Leave.ID,"inner")\
.select(df_fact.Amount,df_Department.ID,df_Department.salary,df_Department.department,df_Phone.Phone_type,df_Leave.Leave_type) ```
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
0
271
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Поскольку вы используете inner соединение во всех кадрах данных, если вы хотите предотвратить громоздкий код, вы можете использовать .reduce() в functools для объединения и выбора нужного столбца:

df = reduce(lambda x, y: x.join(y, on='id', how='inner'), [df_fact, df_Department, df_Leave, df_Phone])
df.show(10, False)
+---+------+---------+------+------+----------+------------+----------+
|ID |NAME  |Company  |Amount|salary|department|Leave_type  |Phone_type|
+---+------+---------+------+------+----------+------------+----------+
|1  |sravan|company 1|100   |45000 |IT        |Sick Leave  |Apple     |
|2  |ojaswi|company 1|200   |145000|Manager   |Casual leave|Samsung   |
+---+------+---------+------+------+----------+------------+----------+

https://docs.python.org/3/library/functools.html#functools.reduce


Редактировать 1: Если вам нужно указать другой ключ в разных соединениях, учитывая, что вы уже переименовали столбцы:

df = reduce(lambda x, y: x.join(y, on=list(set(x.columns)&set(y.columns)), how='inner'), [df_fact, df_Department, df_Leave, df_Phone])
df.show(10, False)
+---+------+---------+------+------+----------+------------+----------+
|ID |NAME  |Company  |Amount|salary|department|Leave_type  |Phone_type|
+---+------+---------+------+------+----------+------------+----------+
|1  |sravan|company 1|100   |45000 |IT        |Sick Leave  |Apple     |
|2  |ojaswi|company 1|200   |145000|Manager   |Casual leave|Samsung   |
+---+------+---------+------+------+----------+------------+----------+

Что произойдет, если мне придется присоединиться....table1.id == table2.departmentid, table1.nameid== table3.name Я имею в виду, что столбец присоединения продолжает меняться, можем ли мы это сделать?

Vivek Kumar 28.11.2022 04:48

@VivekKumar Вы можете проверить, помогает ли мое обновление.

Jonathan Lam 28.11.2022 07:28

Прежде всего большое спасибо. если мы обновим фрейм данных df_Leave и изменим его «ID» на «Leave_ID», тогда он выдаст IndexError: индекс списка вне диапазона, потому что ID и Leave_ID не совпадают, можем ли мы сделать что-то вроде этого: df_fact.ID == df_leave.Leave_ID и, пожалуйста, свяжите некоторые документы для лучшего понимания.

Vivek Kumar 28.11.2022 08:31

Я думаю, что вы не можете объявить другой ключ соединения в другом фрейме данных, если хотите использовать reduce, так как вы не можете изменить лямбда-функцию во время выполнения. Если вам действительно нужно использовать свой подход, возможно, следует использовать простой цикл for.

Jonathan Lam 28.11.2022 08:47

не могли бы вы обновить и добавить логику цикла

Vivek Kumar 28.11.2022 08:52

Я могу обновить ответ, но вы хотите попробовать? Я имею в виду, что самое простое решение состоит в том, что вы используете два списка, один для фрейма данных и один для ключа, с одинаковым порядком и соединяете его в цикле.

Jonathan Lam 29.11.2022 15:55

Другие вопросы по теме