Я пытаюсь перенести следующий оператор слияния с PySpark в таблицу ниже (обратите внимание, это моя первая попытка создать таблицу в Stack Overflow с использованием фрагмента HTML, поэтому она показывает таблицу — я думаю, вам нужно нажать на ВЫПОЛНИТЕ ФРАГМЕНТ КОДА, чтобы просмотреть таблицу).
try:
#Perform a merge into the existing table
if allowDuplicates == "true":
(deltadf.alias("t")
.merge(
partdf.alias("s"),
f"s.primary_key_hash = t.primary_key_hash")
.whenNotMatchedInsertAll()
.execute()
)
else:
(deltadf.alias("t")
.merge(
partdf.alias("s"),
"s.primary_key_hash = t.primary_key_hash")
.whenMatchedUpdateAll("s.change_key_hash <> t.change_key_hash")
.whenNotMatchedInsertAll().
execute()
)
Тем не менее, я продолжаю получать сообщение об ошибке:
Невозможно выполнить слияние, так как несколько исходных строк совпали и попытались изменить одну и ту же целевую строку в таблице Delta, возможно, конфликтующую способы. По семантике SQL слияния, когда несколько исходных строк совпадают на одна и та же целевая строка, результат может быть неоднозначным, поскольку он неясен какая исходная строка должна использоваться для обновления или удаления соответствия целевой ряд.
Может кто-нибудь взглянуть на мой код и сообщить мне, почему я продолжаю получать ошибку, пожалуйста.
<html>
<head>
<meta http-equiv = "Content-Type" content = "text/html; charset=Windows-1252" />
<title>Export Data</title>
<style type = "text/css">
.h {
color: Black;
font-family: Tahoma;
font-size: 8pt;
}
table {
border-collapse: collapse;
border-width: 1px;
border-style: solid;
border-color: Silver;
padding: 3px;
}
td {
border-width: 1px;
border-style: solid;
border-color: Silver;
padding: 3px;
}
.rh {
background-color: White;
vertical-align: Top;
color: Black;
font-family: Tahoma;
font-size: 8pt;
text-align: Left;
}
.rt {
background-color: White;
vertical-align: Top;
color: Black;
font-family: Tahoma;
font-size: 8pt;
text-align: Left;
}
</style>
</head>
<bodybgColor=White>
<p class = "h"></p>
<table cellspacing = "0">
<tr class = "rh">
<td>Id</td>
<td>SinkCreatedOn</td>
<td>SinkModifiedOn</td>
</tr>
<tr class = "rt">
<td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
<td>15/12/2022 14:02:51</td>
<td>15/12/2022 14:02:51</td>
</tr>
<tr class = "rt">
<td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
<td>16/12/2022 18:30:59</td>
<td>16/12/2022 18:30:59</td>
</tr>
<tr class = "rt">
<td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
<td>16/12/2022 18:55:04</td>
<td>16/12/2022 18:55:04</td>
</tr>
<tr class = "rt">
<td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
<td>20/12/2022 16:26:45</td>
<td>20/12/2022 16:26:45</td>
</tr>
<tr class = "rt">
<td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
<td>22/12/2022 17:27:45</td>
<td>22/12/2022 17:27:45</td>
</tr>
<tr class = "rt">
<td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
<td>22/12/2022 17:57:48</td>
<td>22/12/2022 17:57:48</td>
</tr>
</table>
<p class = "h"></p>
</body>
</html>
Я собираюсь использовать следующий код дедупликации, как было предложено:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
df2 = partdf.withColumn("rn", row_number().over(Window.partitionBy("primary_key_hash").orderBy("Id")))
df3 = df2.filter("rn = 1").drop("rn")
display(df3)
Чтобы код работал с моим оператором слияния, он должен выглядеть следующим образом:
try:
#Perform a merge into the existing table
if allowDuplicates == "true":
(deltadf.alias("t")
.merge(
df3.alias("s"),
f"s.primary_key_hash = t.primary_key_hash")
.whenNotMatchedInsertAll()
.execute()
)
else:
(deltadf.alias("t")
.merge(
df3.alias("s"),
"s.primary_key_hash = t.primary_key_hash")
.whenMatchedUpdateAll("s.change_key_hash <> t.change_key_hash")
.whenNotMatchedInsertAll().
execute()
)
Вы заметите, что я удалил partdf из оператора слияния и заменил его на df3
Я попытался воспроизвести сценарий и получил ту же ошибку.
Согласно приведенной выше проблеме, в исходной таблице не должно быть повторяющихся полей, которые вы сравниваете в целевой таблице при выполнении над ней операции MERGE. Механизм SQL автоматически выполняет эту проверку, чтобы предотвратить ошибочные модификации и несогласованность данных.
Простое решение заключается в том, что логика дедупликации должна присутствовать до процесса MERGE, чтобы избежать этой проблемы. Вы можете быстро попытаться устранить дубликаты, используя window functions, dropduplicates fuction удаление повторяющихся строк или любую другую логику в соответствии с вашими потребностями:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
df2 = partdf.withColumn("rn", row_number().over(Window.partitionBy("P_key").orderBy("Id")))
df3 = df2.filter("rn = 1").drop("rn")
display(df3)
Выполнено успешно с созданным выше кадром данных:
Привет @Pratik, спасибо, что связались. Собираюсь попробовать это предложение в ближайшие несколько часов.
Привет, @Pratik, кажется, твое заявление о дедупликации работает очень хорошо. У меня просто проблема с тем, где добавить df3 в мой оператор слияния. Я обновил вопрос, указав, где я думал включить переменную df3. С нетерпением жду вашего ответа
Да, вам просто нужно заменить partdf из оператора слияния на df3
Привет @Pratik, я только что сделал это, и это сработало. Спасибо большое дружище.
Привет @Pratik, просто хотел еще раз сказать спасибо.
Привет @Pratik, ваш код работает, однако мне было интересно, не могли бы вы объяснить, что на самом деле делает код: df2 = partdf.withColumn("rn", row_number().over(Window.partitionBy("P_key").orderBy("Id"))) df3 = df2.filter("rn = 1").drop("rn")
Привет, Шарма, как ты знаешь, я задавал этот вопрос некоторое время назад :-). Разница между сейчас и тогда заключается в том, что проблема была вызвана дубликатами первичного ключа. Однако, как вы можете видеть, если вы «Запустите фрагмент кода», у меня нет дубликатов в первичном ключе.