Я пытаюсь использовать метод DataFrame.подсказка(), чтобы добавить Подсказка соединения диапазона к моему соединению.
У меня есть две таблицы: minutes
и events
.
В таблице минут есть столбцы minute_start
и minute_end
, которые показывают время в секундах с фиксированного момента времени. Естественно, их значения кратны 60.
Таблица событий имеет аналогичные столбцы event_start
и event_end
, только для событий. События могут начинаться и заканчиваться в любую секунду.
Для каждого события мне нужно найти все минуты, с которыми оно совпадает.
Я пробую это на Databricks (время выполнения 5.1, Python 3.5):
# from pyspark.sql.types import StructType, StructField, IntegerType
# minutes = spark.sparkContext\
# .parallelize(((0, 60),
# (60, 120)))\
# .toDF(StructType([
# StructField('minute_start', IntegerType()),
# StructField('minute_end', IntegerType())
# ]))
# events = spark.sparkContext\
# .parallelize(((12, 33),
# (0, 120),
# (33, 72),
# (65, 178)))\
# .toDF(StructType([
# StructField('event_start', IntegerType()),
# StructField('event_end', IntegerType())
# ]))
events.hint("range_join", "60")\
.join(minutes,
on=[events.event_start < minutes.minute_end,
minutes.minute_start < events.event_end])\
.orderBy(events.event_start,
events.event_end,
minutes.minute_start)\
.show()
Без вызова hint
результат такой, как и ожидалось:
+-----------+---------+------------+----------+
|event_start|event_end|minute_start|minute_end|
+-----------+---------+------------+----------+
| 0| 120| 0| 60|
| 0| 120| 60| 120|
| 12| 33| 0| 60|
| 33| 72| 0| 60|
| 33| 72| 60| 120|
| 65| 178| 60| 120|
+-----------+---------+------------+----------+
С hint
я получаю исключение:
AnalysisException: 'Range join hint: invalid arguments Buffer(60);'
Когда я попытался передать 60
в подсказке как число, а не строку, он пожаловался, что параметр подсказки должен быть строкой.
Меня зовут не на азуре, но я ожидаю, что результат будет таким же.
Кто-нибудь имел подобную проблему и нашел решение или знает, где я делаю ошибку?
ОБНОВЛЕНИЕ 1
(В настоящее время я пробую это на Databricks Runtime 6.1, Python 3.7.3, Spark 2.4.4)
Я думал, что пропустил, что параметры ожидаются как итерируемые, поэтому я попробовал еще раз с events.hint("range_join", [60])
. Та же жалоба на то, что аргумент не является строкой: TypeError: all parameters should be str, got 60 of type <class 'int'>
.
Мне интересно, отстала ли версия Spark от Databricks.
Это в Spark исходный код на GitHub:
def hint(self, name, *parameters):
... (no checks on `parameters` up to here)
allowed_types = (basestring, list, float, int)
for p in parameters:
if not isinstance(p, allowed_types):
raise TypeError(
"all parameters should be in {0}, got {1} of type {2}".format(
allowed_types, p, type(p)))
... (no checks beyond this point)
поэтому список int
должен быть разрешен.
Я получаю all parameters should be str
, но версия GitHub скажет all parameters should be in (basestring, list, float, int)
, если я передам параметр неправильного типа.
ОБНОВЛЕНИЕ 2
hint("skew", "col_name")
вроде работает.
Пока я не нашел решения.
Я обнаружил, что вместо использования .hint()
вы можете настроить конфигурацию сеанса Spark для использования ячеек объединения диапазонов, если вы используете блоки данных docs.databricks.com/delta/join-performance/….
@MarcusLind Да, это одна вещь, которую я использовал, но, похоже, она зависит от платформы, и я бы хотел, чтобы код также работал локально. Вы также можете просто написать это на SQL: spark.sql("SELECT /*+ RANGE_JOIN(events, 60)*/ ... "
Я проверил исходный код Spark на GitHub.
Версия 2.4.4 имеет это:
def hint(self, name, *parameters):
... # no checks on `parameters` up to here
for p in parameters:
if not isinstance(p, str):
raise TypeError(
"all parameters should be str, got {0} of type {1}".format(p, type(p)))
... # no checks beyond here
Но начиная с версия 3.0.0-превью-rc1 в источнике есть это:
def hint(self, name, *parameters):
... # no checks on `parameters` up to here
allowed_types = (basestring, list, float, int)
for p in parameters:
if not isinstance(p, allowed_types):
raise TypeError(
"all parameters should be in {0}, got {1} of type {2}".format(
allowed_types, p, type(p)))
... # no checks beyond here
Таким образом, кажется, что в версии 2.4.4 была ошибка, которая была исправлена в версиях, начиная с 3.0.0-preview-rc1.
Я также проверил 2.4.5, и ошибка все еще там, поэтому нам, вероятно, придется ждать 3.0 на Databricks.
Создал PR, чтобы вернуть это обратно в ветку 2.4: github.com/apache/spark/pull/28238
@FokkoDriesprong, похоже, они не согласны с тем, что это ошибка, потому что Range Join отсутствует в документации API Apache pySpark. Ну, я бы сказал, что .hint("range_join", param)
вызывает исключение, которое не говорит, что "range_join"
является недопустимым типом подсказки, а скорее говорит, что param
просто неправильный тип, предполагая, что все еще существуют правильные типы для param
. Таким образом, в основном это позволяет использовать любой "hint_type"
независимо от того, знает ли JVM, как с этим обращаться или нет. Мне это кажется в лучшем случае плохой обработкой исключений. Я понимаю, что вы можете определить свои собственные подсказки, это открытый исходный код, но...
@FokkoDriesprong продолжение ... код никогда не достигает точки, когда понимает: «О, но этот тип подсказки нигде не определен в вашем коде, так что теперь ваше исключение». Хотя я не чувствую себя сильным в том, чтобы настаивать на этом.
У меня такая же проблема.