У меня довольно сложный процесс создания фрейма данных pyspark, его преобразования в фрейм данных pandas и вывода результата в плоский файл. Я не уверен, в какой момент возникает ошибка, поэтому опишу весь процесс.
Для начала у меня есть фреймворк pyspark, который содержит попарное сходство для наборов идентификаторов. Это выглядит так:
+------+-------+-------------------+
| ID_A| ID_B| EuclideanDistance|
+------+-------+-------------------+
| 1| 1| 0.0|
| 1| 2|0.13103884200454394|
| 1| 3| 0.2176246463836219|
| 1| 4| 0.280568636550471|
...
Мне нравится группировать их по ID_A, сортировать каждую группу по Евклидовому расстоянию и брать только верхние N пар для каждой группы. Итак, сначала я делаю это:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, row_number
window = Window.partitionBy(df['ID_A']).orderBy(df_sim['EuclideanDistance'])
result = (df.withColumn('row_num', row_number().over(window)))
Я убеждаюсь, что ID_A = 1 все еще находится в фрейме данных «результата». Затем я делаю это, чтобы ограничить каждую группу до 20 строк:
result1 = result.where(result.row_num<20)
result1.toPandas().to_csv("mytest.csv")
и ID_A = 1 НЕ входит в результирующий файл .csv (хотя он все еще присутствует в result1). Есть ли где-нибудь в этой цепочке преобразований проблема, которая может привести к потере данных?
Вы ссылаетесь на 2 фрейма данных в окне своего решения. Не уверен, что это вызывает вашу ошибку, но это стоит исправить. В любом случае вам не нужно ссылаться на конкретный фрейм данных в определение окна. В любом случае попробуйте
window = Window.partitionBy('ID_A').orderBy('EuclideanDistance')
Как упоминал Дэвид, вы ссылаетесь на второй фрейм данных «df_sim» в своей оконной функции.
Я протестировал следующее, и он работает на моей машине (известные последние слова):
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, row_number
import pandas as pd
#simulate some data
df = pd.DataFrame({'ID_A': pd.np.arange(100)%5,
'ID_B': pd.np.repeat(pd.np.arange(20),5),
'EuclideanDistance': pd.np.random.rand(100)*5}
)
#artificially set distance between point and self to 0
df['EuclideanDistance'][df['ID_A'] == df['ID_B']] = 0
df = spark.createDataFrame(df)
#end simulation
window = Window.partitionBy(df['ID_A']).orderBy(df['EuclideanDistance'])
output = df.select('*', row_number().over(window).alias('rank')).filter(col('rank') <= 10)
output.show(50)
Код моделирования нужен только для того, чтобы сделать этот пример самодостаточным. Конечно, вы можете использовать свой реальный фрейм данных и игнорировать моделирование при ее тестировании. Надеюсь, что это сработает!