Почему я получаю исключение при использовании подсказки соединения диапазона?

Я пытаюсь использовать метод DataFrame.подсказка(), чтобы добавить Подсказка соединения диапазона к моему соединению.

У меня есть две таблицы: minutes и events.

В таблице минут есть столбцы minute_start и minute_end, которые показывают время в секундах с фиксированного момента времени. Естественно, их значения кратны 60.

Таблица событий имеет аналогичные столбцы event_start и event_end, только для событий. События могут начинаться и заканчиваться в любую секунду.

Для каждого события мне нужно найти все минуты, с которыми оно совпадает.

Я пробую это на Databricks (время выполнения 5.1, Python 3.5):

# from pyspark.sql.types import StructType, StructField, IntegerType

# minutes = spark.sparkContext\
#                .parallelize(((0,  60),
#                              (60, 120)))\
#                .toDF(StructType([
#                          StructField('minute_start', IntegerType()),
#                          StructField('minute_end', IntegerType())
#                        ]))

# events = spark.sparkContext\
#               .parallelize(((12, 33),
#                             (0,  120),
#                             (33, 72),
#                             (65, 178)))\
#               .toDF(StructType([
#                         StructField('event_start', IntegerType()),
#                         StructField('event_end', IntegerType())
#                       ]))

events.hint("range_join", "60")\
      .join(minutes,
            on=[events.event_start   < minutes.minute_end,
                minutes.minute_start < events.event_end])\
      .orderBy(events.event_start,
               events.event_end,
               minutes.minute_start)\
      .show()

Без вызова hint результат такой, как и ожидалось:

+-----------+---------+------------+----------+
|event_start|event_end|minute_start|minute_end|
+-----------+---------+------------+----------+
|          0|      120|           0|        60|
|          0|      120|          60|       120|
|         12|       33|           0|        60|
|         33|       72|           0|        60|
|         33|       72|          60|       120|
|         65|      178|          60|       120|
+-----------+---------+------------+----------+

С hint я получаю исключение:

AnalysisException: 'Range join hint: invalid arguments Buffer(60);'

Когда я попытался передать 60 в подсказке как число, а не строку, он пожаловался, что параметр подсказки должен быть строкой.

Меня зовут не на азуре, но я ожидаю, что результат будет таким же.

Кто-нибудь имел подобную проблему и нашел решение или знает, где я делаю ошибку?

ОБНОВЛЕНИЕ 1

(В настоящее время я пробую это на Databricks Runtime 6.1, Python 3.7.3, Spark 2.4.4)

Я думал, что пропустил, что параметры ожидаются как итерируемые, поэтому я попробовал еще раз с events.hint("range_join", [60]). Та же жалоба на то, что аргумент не является строкой: TypeError: all parameters should be str, got 60 of type <class 'int'>.

Мне интересно, отстала ли версия Spark от Databricks.

Это в Spark исходный код на GitHub:

def hint(self, name, *parameters):
    ... (no checks on `parameters` up to here)

    allowed_types = (basestring, list, float, int)
    for p in parameters:
        if not isinstance(p, allowed_types):
            raise TypeError(
                "all parameters should be in {0}, got {1} of type {2}".format(
                        allowed_types, p, type(p)))

    ... (no checks beyond this point)

поэтому список int должен быть разрешен.

Я получаю all parameters should be str, но версия GitHub скажет all parameters should be in (basestring, list, float, int), если я передам параметр неправильного типа.

ОБНОВЛЕНИЕ 2

hint("skew", "col_name") вроде работает.

У меня такая же проблема.

Marcus Lind 11.12.2019 15:53

Пока я не нашел решения.

Arseny 11.12.2019 23:20

Я обнаружил, что вместо использования .hint() вы можете настроить конфигурацию сеанса Spark для использования ячеек объединения диапазонов, если вы используете блоки данных docs.databricks.com/delta/join-performance/….

Marcus Lind 12.12.2019 14:19

@MarcusLind Да, это одна вещь, которую я использовал, но, похоже, она зависит от платформы, и я бы хотел, чтобы код также работал локально. Вы также можете просто написать это на SQL: spark.sql("SELECT /*+ RANGE_JOIN(events, 60)*/ ... "

Arseny 13.12.2019 20:12
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
5
4
2 004
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Я проверил исходный код Spark на GitHub.

Версия 2.4.4 имеет это:

def hint(self, name, *parameters):
    ...  # no checks on `parameters` up to here

    for p in parameters:
        if not isinstance(p, str):
            raise TypeError(
                "all parameters should be str, got {0} of type {1}".format(p, type(p)))

    ...  # no checks beyond here

Но начиная с версия 3.0.0-превью-rc1 в источнике есть это:

def hint(self, name, *parameters):
    ...  # no checks on `parameters` up to here

    allowed_types = (basestring, list, float, int)
    for p in parameters:
        if not isinstance(p, allowed_types):
            raise TypeError(
                "all parameters should be in {0}, got {1} of type {2}".format(
                    allowed_types, p, type(p)))

    ...  # no checks beyond here

Таким образом, кажется, что в версии 2.4.4 была ошибка, которая была исправлена ​​в версиях, начиная с 3.0.0-preview-rc1.

Я также проверил 2.4.5, и ошибка все еще там, поэтому нам, вероятно, придется ждать 3.0 на Databricks.

Arseny 26.03.2020 19:28

Создал PR, чтобы вернуть это обратно в ветку 2.4: github.com/apache/spark/pull/28238

Fokko Driesprong 17.04.2020 08:21

@FokkoDriesprong, похоже, они не согласны с тем, что это ошибка, потому что Range Join отсутствует в документации API Apache pySpark. Ну, я бы сказал, что .hint("range_join", param) вызывает исключение, которое не говорит, что "range_join" является недопустимым типом подсказки, а скорее говорит, что param просто неправильный тип, предполагая, что все еще существуют правильные типы для param. Таким образом, в основном это позволяет использовать любой "hint_type" независимо от того, знает ли JVM, как с этим обращаться или нет. Мне это кажется в лучшем случае плохой обработкой исключений. Я понимаю, что вы можете определить свои собственные подсказки, это открытый исходный код, но...

Arseny 07.05.2020 02:10

@FokkoDriesprong продолжение ... код никогда не достигает точки, когда понимает: «О, но этот тип подсказки нигде не определен в вашем коде, так что теперь ваше исключение». Хотя я не чувствую себя сильным в том, чтобы настаивать на этом.

Arseny 07.05.2020 02:12

Другие вопросы по теме