Я работаю с иерархическими данными в PySpark, где у каждого сотрудника есть менеджер, и мне нужно найти всех встроенных менеджеров для каждого сотрудника. Встроенный менеджер определяется как менеджер менеджера и так далее, пока не дойдет до менеджера верхнего уровня (CEO), у которого нет менеджера.
from pyspark.sql.functions import udf, broadcast, col
from pyspark.sql.types import ArrayType, StringType
# Broadcast the DataFrame df
broadcast_df = broadcast(df)
# Define a function to find inline managers
def find_inline_managers(user_id, manager_id):
inline_managers = []
while manager_id is not None:
manager = broadcast_df.filter(col("user_id") == manager_id).select("username").first()[0]
inline_managers.append(f"{manager}_level_{len(inline_managers) + 1}")
manager_id = broadcast_df.filter(col("user_id") == manager_id).select("manager_id").first()[0]
return inline_managers
# Register the UDF
find_inline_managers_udf = udf(find_inline_managers, ArrayType(StringType()))
# Apply the UDF to create the new column
df = df.withColumn("inline_managers", find_inline_managers_udf("user_id", "manager_id"))
Здесь я разработал udf find_inline_managers для создания производного столбца «inline_mangers», но получаю следующее сообщение об ошибке:
PicklingError: Не удалось сериализовать объект: RuntimeError: Похоже, вы пытаетесь ссылаться на SparkContext из широковещательной переменной, действия или преобразования. SparkContext можно использовать только в драйвере, а не в коде, который он запускает на рабочих процессах. Для получения дополнительной информации см. СПАРК-5063.
Как мы можем это исправить? Если вы знаете какой-либо альтернативный способ решения этой проблемы, дайте мне знать, спасибо Примечание. Рекурсивный CTE не поддерживается в Databricks.





Обязательно ли использовать Pyspark в Databricks?
Если да, этот ответ может вам помочь. Он делает именно это.
https://stackoverflow.com/a/77627393/3238085
Вот еще одно решение, которое может вам помочь. К сожалению, человек, задавший вопрос, удалил его.
https://gist.github.com/dineshdharme/7c13dcde72e42fdd3ec47d1ad40f6177
Поскольку это постановка задачи в виде дерева или графа, библиотека обработки, такая как NetworkX или Graphframes, или графический инструмент может быть более полезной.
Я бы предложил изучить их. Я считаю, что лучше всего подойдет NetworkX или графический инструмент.
Что такое графический инструмент?
Graph-tool — это эффективный модуль Python для манипулирования и статистического анализа графиков (т.н. сетей). В отличие от большинства других модулей Python с аналогичной функциональностью, основные структуры данных и алгоритмы реализованы на C++, широко используя метапрограммирование шаблонов, в значительной степени основанное на библиотеке Boost Graph. Это обеспечивает уровень производительности, сравнимый (как по использованию памяти, так и по времени вычислений) с уровнем производительности чистой библиотеки C/C++.
Graph-инструмент может быть на несколько порядков быстрее, чем альтернативы, основанные только на Python, и поэтому он особенно подходит для крупномасштабного сетевого анализа.
https://graph-tool.skewed.de/static/doc/index.html#installing-graph-tool
conda create --name gt -c conda-forge graph-tool
conda activate gt
Ниже приведен простой пример того, как это сделать:
from graph_tool.all import Graph, graph_draw, BFSVisitor, bfs_search
from graph_tool.draw import radial_tree_layout
data = [
("CEO", "M3"),
("M3", "M1"),
("M3", "M2"),
("M1", "E1"),
("M1", "E2"),
("M2", "E3")
]
g = Graph(directed=True)
vertex_dict = {}
def get_vertex(name):
if name not in vertex_dict:
v = g.add_vertex()
vertex_dict[name] = v
v_name[v] = name
return vertex_dict[name]
v_name = g.new_vertex_property("string")
for manager, employee in data:
v_manager = get_vertex(manager)
v_employee = get_vertex(employee)
g.add_edge(v_manager, v_employee)
class PathCollector(BFSVisitor):
def __init__(self, pred_map):
self.pred_map = pred_map
def tree_edge(self, e):
self.pred_map[e.target()] = e.source()
def find_paths(root):
pred_map = g.new_vertex_property("int64_t", -1)
bfs_search(g, root, PathCollector(pred_map))
def get_path_to(v):
path = []
while v != root and g.vertex(v) != None:
path.append(v)
v = pred_map[v]
path.append(root)
path.reverse()
return path
paths = {v_name[v]: [v_name[x] for x in get_path_to(v)] for v in vertex_dict.values() if v != root}
return paths
root_vertex = vertex_dict["CEO"]
paths = find_paths(root_vertex)
for emp, path in paths.items():
path_reversed = reversed(path)
print(f"Path from {emp} to CEO: {' -> '.join(path_reversed)}")
pos = radial_tree_layout(g, g.vertex(vertex_dict["CEO"]))
graph_draw(g, pos, vertex_text=v_name, vertex_font_size=18,
output_size=(1000, 1000), output = "manager-employee.png")
Выход :
Path from M3 to CEO: M3 -> CEO
Path from M1 to CEO: M1 -> M3 -> CEO
Path from M2 to CEO: M2 -> M3 -> CEO
Path from E1 to CEO: E1 -> M1 -> M3 -> CEO
Path from E2 to CEO: E2 -> M1 -> M3 -> CEO
Path from E3 to CEO: E3 -> M2 -> M3 -> CEO
Изображение, созданное библиотекой: