Ошибка Databricks: невозможно выполнить слияние, так как несколько исходных строк совпали и попытались изменить одну и ту же целевую строку в разностной таблице конфликтующим образом.

Я пытаюсь перенести следующий оператор слияния с PySpark в таблицу ниже (обратите внимание, это моя первая попытка создать таблицу в Stack Overflow с использованием фрагмента HTML, поэтому она показывает таблицу — я думаю, вам нужно нажать на ВЫПОЛНИТЕ ФРАГМЕНТ КОДА, чтобы просмотреть таблицу).

try:
  #Perform a merge into the existing table
  if allowDuplicates == "true":
    (deltadf.alias("t")
       .merge(
        partdf.alias("s"),
        f"s.primary_key_hash = t.primary_key_hash")
      .whenNotMatchedInsertAll()
     .execute()
    )
  else:
    (deltadf.alias("t")
       .merge(
        partdf.alias("s"),
        "s.primary_key_hash = t.primary_key_hash")
      .whenMatchedUpdateAll("s.change_key_hash <> t.change_key_hash")
      .whenNotMatchedInsertAll().
     execute()
    )

Тем не менее, я продолжаю получать сообщение об ошибке:

Невозможно выполнить слияние, так как несколько исходных строк совпали и попытались изменить одну и ту же целевую строку в таблице Delta, возможно, конфликтующую способы. По семантике SQL слияния, когда несколько исходных строк совпадают на одна и та же целевая строка, результат может быть неоднозначным, поскольку он неясен какая исходная строка должна использоваться для обновления или удаления соответствия целевой ряд.

Может кто-нибудь взглянуть на мой код и сообщить мне, почему я продолжаю получать ошибку, пожалуйста.

<html>

<head>
  <meta http-equiv = "Content-Type" content = "text/html; charset=Windows-1252" />
  <title>Export Data</title>
  <style type = "text/css">
    .h {
      color: Black;
      font-family: Tahoma;
      font-size: 8pt;
    }
    
    table {
      border-collapse: collapse;
      border-width: 1px;
      border-style: solid;
      border-color: Silver;
      padding: 3px;
    }
    
    td {
      border-width: 1px;
      border-style: solid;
      border-color: Silver;
      padding: 3px;
    }
    
    .rh {
      background-color: White;
      vertical-align: Top;
      color: Black;
      font-family: Tahoma;
      font-size: 8pt;
      text-align: Left;
    }
    
    .rt {
      background-color: White;
      vertical-align: Top;
      color: Black;
      font-family: Tahoma;
      font-size: 8pt;
      text-align: Left;
    }
  </style>
</head>
<bodybgColor=White>
  <p class = "h"></p>
  <table cellspacing = "0">

    <tr class = "rh">
      <td>Id</td>
      <td>SinkCreatedOn</td>
      <td>SinkModifiedOn</td>
    </tr>

    <tr class = "rt">
      <td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
      <td>15/12/2022 14:02:51</td>
      <td>15/12/2022 14:02:51</td>
    </tr>

    <tr class = "rt">
      <td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
      <td>16/12/2022 18:30:59</td>
      <td>16/12/2022 18:30:59</td>
    </tr>

    <tr class = "rt">
      <td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
      <td>16/12/2022 18:55:04</td>
      <td>16/12/2022 18:55:04</td>
    </tr>

    <tr class = "rt">
      <td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
      <td>20/12/2022 16:26:45</td>
      <td>20/12/2022 16:26:45</td>
    </tr>

    <tr class = "rt">
      <td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
      <td>22/12/2022 17:27:45</td>
      <td>22/12/2022 17:27:45</td>
    </tr>

    <tr class = "rt">
      <td>AC28CA8A-80B6-EC11-983F-0022480078D3</td>
      <td>22/12/2022 17:57:48</td>
      <td>22/12/2022 17:57:48</td>
    </tr>

  </table>
  <p class = "h"></p>
  </body>

</html>

Я собираюсь использовать следующий код дедупликации, как было предложено:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

df2 = partdf.withColumn("rn", row_number().over(Window.partitionBy("primary_key_hash").orderBy("Id")))
df3 = df2.filter("rn = 1").drop("rn")
display(df3)

Чтобы код работал с моим оператором слияния, он должен выглядеть следующим образом:

try:
  #Perform a merge into the existing table
  if allowDuplicates == "true":
    (deltadf.alias("t")
       .merge(
        df3.alias("s"),
        f"s.primary_key_hash = t.primary_key_hash")
      .whenNotMatchedInsertAll()
     .execute()
    )
  else:
    (deltadf.alias("t")
       .merge(
        df3.alias("s"),
        "s.primary_key_hash = t.primary_key_hash")
      .whenMatchedUpdateAll("s.change_key_hash <> t.change_key_hash")
      .whenNotMatchedInsertAll().
     execute()
    )

Вы заметите, что я удалил partdf из оператора слияния и заменил его на df3

Привет, Шарма, как ты знаешь, я задавал этот вопрос некоторое время назад :-). Разница между сейчас и тогда заключается в том, что проблема была вызвана дубликатами первичного ключа. Однако, как вы можете видеть, если вы «Запустите фрагмент кода», у меня нет дубликатов в первичном ключе.

Patterson 10.01.2023 18:18
Инструменты для веб-скрапинга с открытым исходным кодом: Python Developer Toolkit
Инструменты для веб-скрапинга с открытым исходным кодом: Python Developer Toolkit
Веб-скрейпинг, как мы все знаем, это дисциплина, которая развивается с течением времени. Появляются все более сложные средства борьбы с ботами, а...
Калькулятор CGPA 12 для семестра
Калькулятор CGPA 12 для семестра
Чтобы запустить этот код и рассчитать CGPA, необходимо сохранить код как HTML-файл, а затем открыть его в веб-браузере. Для этого выполните следующие...
ONLBest Online HTML CSS JAVASCRIPT Training In INDIA 2023
ONLBest Online HTML CSS JAVASCRIPT Training In INDIA 2023
О тренинге HTML JavaScript :HTML (язык гипертекстовой разметки) и CSS (каскадные таблицы стилей) - две основные технологии для создания веб-страниц....
Как собрать/развернуть часть вашего приложения Angular
Как собрать/развернуть часть вашего приложения Angular
Вам когда-нибудь требовалось собрать/развернуть только часть вашего приложения Angular или, возможно, скрыть некоторые маршруты в определенных средах?
Запуск PHP на IIS без использования программы установки веб-платформы
Запуск PHP на IIS без использования программы установки веб-платформы
Установщик веб-платформы, предлагаемый компанией Microsoft, перестанет работать 31 декабря 2022 года. Его закрытие привело к тому, что мы не можем...
Оптимизация React Context шаг за шагом в 4 примерах
Оптимизация React Context шаг за шагом в 4 примерах
При использовании компонентов React в сочетании с Context вы можете оптимизировать рендеринг, обернув ваш компонент React в React.memo сразу после...
0
1
134
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я попытался воспроизвести сценарий и получил ту же ошибку.

Согласно приведенной выше проблеме, в исходной таблице не должно быть повторяющихся полей, которые вы сравниваете в целевой таблице при выполнении над ней операции MERGE. Механизм SQL автоматически выполняет эту проверку, чтобы предотвратить ошибочные модификации и несогласованность данных.

Простое решение заключается в том, что логика дедупликации должна присутствовать до процесса MERGE, чтобы избежать этой проблемы. Вы можете быстро попытаться устранить дубликаты, используя window functions, dropduplicates fuction удаление повторяющихся строк или любую другую логику в соответствии с вашими потребностями:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

df2 = partdf.withColumn("rn", row_number().over(Window.partitionBy("P_key").orderBy("Id")))
df3 = df2.filter("rn = 1").drop("rn")
 
display(df3)

Выполнено успешно с созданным выше кадром данных:

Привет @Pratik, спасибо, что связались. Собираюсь попробовать это предложение в ближайшие несколько часов.

Patterson 11.01.2023 11:36

Привет, @Pratik, кажется, твое заявление о дедупликации работает очень хорошо. У меня просто проблема с тем, где добавить df3 в мой оператор слияния. Я обновил вопрос, указав, где я думал включить переменную df3. С нетерпением жду вашего ответа

Patterson 11.01.2023 12:50

Да, вам просто нужно заменить partdf из оператора слияния на df3

Pratik Lad 11.01.2023 13:01

Привет @Pratik, я только что сделал это, и это сработало. Спасибо большое дружище.

Patterson 11.01.2023 13:03

Привет @Pratik, просто хотел еще раз сказать спасибо.

Patterson 12.01.2023 12:28

Привет @Pratik, ваш код работает, однако мне было интересно, не могли бы вы объяснить, что на самом деле делает код: df2 = partdf.withColumn("rn", row_number().over(Window.partitionBy("P_key").orderBy("Id"))‌​) df3 = df2.filter("rn = 1").drop("rn")

Patterson 18.01.2023 12:32

Другие вопросы по теме