В литейном цехе palantir я пытаюсь прочитать все xml-файлы из набора данных. Затем в цикле for я анализирую xml-файлы.
До предпоследней строки код работает нормально, без ошибок.
from transforms.api import transform, Input, Output
from transforms.verbs.dataframes import sanitize_schema_for_parquet
from bs4 import BeautifulSoup
import pandas as pd
import lxml
@transform(
output=Output("/Spring/xx/datasets/mydataset2"),
source_df=Input("ri.foundry.main.dataset.123"),
)
def read_xml(ctx, source_df, output):
df = pd.DataFrame()
filesystem = source_df.filesystem()
hadoop_path = filesystem.hadoop_path
files = [f"{hadoop_path}/{f.path}" for f in filesystem.ls()]
for i in files:
with open(i, 'r') as f:
file = f.read()
soup = BeautifulSoup(file,'xml')
data = []
for e in soup.select('offer'):
data.append({
'meldezeitraum': e.find_previous('data').get('meldezeitraum'),
'id':e.get('id'),
'parent_id':e.get('parent_id'),
})
df = df.append(data)
output.write_dataframe(sanitize_schema_for_parquet(df))
Однако, как только я добавлю последнюю строку:
output.write_dataframe(sanitize_schema_for_parquet(df))
Я получаю эту ошибку:
Missing transform attribute
A DataFrame object does not have an attribute select. Please check the spelling and/or the datatype of the object.
/transforms-python/src/myproject/datasets/mydataset.py
output.write_dataframe(sanitize_schema_for_parquet(df))
Что я делаю не так?
Вы должны преобразовать свой DataFrame pandas в искровой DataFrame. Несмотря на то, что они имеют одно и то же имя, это два разных типа объектов в python.
Самый простой способ сделать это
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df_spark = spark.createDataFrame(df)
Затем вы можете передать spark_df
функции output.write_dataframe()