Можно ли добавить / заменить существующее выражение столбца в DataFrame API / SQL с использованием точки расширения.
Пример: предположим, что мы вводим правило разрешения, которое может проверить проект. узел из плана и при проверке наличия столбца "имя" замените его с верхним (имя), например.
Возможно ли такое при использовании точек расширения. Примеры, которые у меня есть найденные в основном простые, которые не управляют входными выражениями так, как мне нужно.
Пожалуйста, дайте мне знать, возможно ли это.
@prakharjain - Мой вариант использования - изменить запрос на основе некоторых условий. Изменить клиентскую программу невозможно. Итак, исследуем катализатор прямо сейчас. Верхний регистр - это просто пример. Это могло быть что угодно.
@BhanupalSinghRathore Пожалуйста, проверьте ответ ниже и убедитесь, что это то, что вы ищете.
Да, это возможно.
Возьмем пример. Предположим, мы хотим написать правило, которое проверяет наличие оператора Project, и если проект предназначен для определенного столбца (скажем, 'column2'), оно умножает его на 2.
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.Column
import org.apache.spark.sql.types._
object DoubleColumn2OptimizationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case p: Project =>
if (p.projectList.filter(_.name == "column2").size >= 1) {
val newList = p.projectList.map { case x =>
if (x.name == "column2") {
Alias(Multiply(Literal(2, IntegerType), x), "column2_doubled")()
} else {
x
}
}
p.copy(projectList = newList)
} else {
p
}
}
}
скажем, у нас есть таблица table1, в которой есть два столбца - column1, column2.
Без этого правила -
> spark.sql("select column2 from table1 limit 10").collect()
Array([1], [2], [3], [4], [5], [6], [7], [8], [9], [10])
с этим правилом -
> spark.experimental.extraOptimizations = Seq(DoubleColumn2OptimizationRule)
> spark.sql("select column2 from table1 limit 10").collect()
Array([2], [4], [6], [8], [10], [12], [14], [16], [18], [20])
Также вы можете вызвать объяснение в DataFrame, чтобы проверить план -
> spark.sql("select column2 from table1 limit 10").explain == Physical Plan == CollectLimit 10 +- *(1) LocalLimit 10 +- *(1) Project [(2 * column2#213) AS column2_doubled#214] +- HiveTableScan [column2#213], HiveTableRelation `default`.`table1`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [column1#212, column2#213]
Спасибо за подробный ответ. Я работал с UDF для столбцов, а не для буквальных значений. В моем случае проблемой было использование AttributeExpression. Однако я посмотрел на пример и обернул свой код псевдонимом, и он отлично работает.
Почему вы хотите преобразовать имя атрибута проекта в верхний регистр? Можете ли вы рассказать о сценарии использования?