У меня есть DataFrame, который состоит из столбца ArrayType, и массив может иметь разную длину в каждой строке данных. Ниже я привел пример кода, который может создать некоторые фиктивные данные с аналогичной структурой.
Вы увидите, что для одной транзакции у меня есть идентификатор транзакции, а также некоторые дополнительные данные, каждая из которых хранится в «сегменте». Здесь мы видим один сегмент, в котором хранится информация о клиенте (всегда массив длины два), и у нас есть дополнительный сегмент для каждой покупки товара. Сама информация о купленном товаре представляет собой массив различной длины; первые два элемента массива всегда будут идентификатором и названием купленного товара; дополнительные элементы массива могут существовать для цвета и т. д., но мы можем игнорировать их в этом случае использования.
val dfschema = new StructType()
.add("transaction",
new StructType()
.add(
"transaction_id",
StringType
)
.add(
"segments",
ArrayType(
new StructType()
.add("segment_id",StringType)
.add("segment_fields",ArrayType(
StringType,
false
)
), false
)
)
)
val mockdata = Seq(
Row(
Row(
"2e6d57769e49ae8cb0c4105548c4389d",
List(
Row(
"CustomerInformation",
List(
"SomeCustomerName",
"SomeCustomerEmail"
)
),
Row(
"ItemPurchased",
List(
"SomeItemID",
"SomeItemName"
)
),
Row(
"ItemPurchased",
List(
"AnotherItemID",
"AnotherItemName",
"ItemColor"
)
),
Row(
"ItemPurchased",
List(
"YetAnotherItemID",
"YetAnotherItemName",
"ItemColor"
)
)
)
)
)
)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(mockdata),
dfschema)
Чего я хочу добиться, так это преобразовать приведенное выше в другой фрейм данных с двумя столбцами: один для имени клиента и один для имени элемента. Для приведенного выше примера он хотел бы:
Имя Клиента | название предмета |
---|---|
SomeCustomerName | SomeItemName |
SomeCustomerName | Другое имя элемента |
SomeCustomerName | ещеназваниеэлемента |
Однако я не хочу жестко кодировать поля данных, которые я извлекаю; вместо этого я хочу написать пару функций, которые вы могли бы запускать как часть команды select, например:
df(
select(
get_single_subsegment("CustomerInformation", 0),
get_repeated_subsements("ItemPurchased", 1)
)
)
Таким образом, если я решу получить адрес электронной почты клиента вместо имени, мне просто нужно изменить изменение с 0 на 1 в приведенном выше примере. И я даже могу передать номер индекса как переменную.
Можно ли это сделать?
Начиная со Spark 3.0, вы можете использовать встроенные функции spark для определения двух ваших функций get_single_subsegment
и get_repeated_subsegments
.
Для get_single_subsegment
вы можете сначала отфильтровать массив сегментов по segment_id с помощью filter
, затем получить первый элемент этого отфильтрованного массива с помощью getItem
, а затем получить элемент с нужным индексом в этом объекте сегмента с помощью getField
и getItem
:
import org.apache.spark.sql.functions.{col, filter}
import org.apache.spark.sql.Column
def get_single_subsegment(segmentId: String, index: Int): Column = {
filter(col("transaction.segments"), c => c.getField("segment_id") === segmentId)
.getItem(0)
.getField("segment_fields")
.getItem(index)
}
Для get_repeated_subsegments
вы сначала фильтруете, как в get_single_subsegment
, но затем используете transform
для извлечения индекса полей правого сегмента для каждого элемента отфильтрованного массива, а затем explode
этого массива, чтобы иметь одну строку за элементом отфильтрованного массива:
import org.apache.spark.sql.functions.{col, explode, filter, transform}
import org.apache.spark.sql.Column
def get_repeated_subsegments(segmentId: String, index: Int): Column = {
explode(
transform(
filter(col("transaction.segments"), c => c.getField("segment_id") === segmentId)
.getField("segment_fields"),
c => c.getItem(index)
)
)
}
Если мы применим две функции, определенные выше, к вашему примеру, мы получим следующий результат:
df.select(
get_single_subsegment("CustomerInformation", 0).as("customer_name"),
get_repeated_subsegments("ItemPurchased", 1).as("item_name")
).show(false)
// +----------------+------------------+
// |customer_name |item_name |
// +----------------+------------------+
// |SomeCustomerName|SomeItemName |
// |SomeCustomerName|AnotherItemName |
// |SomeCustomerName|YetAnotherItemName|
// +----------------+------------------+
Если, как вы прокомментировали, вы хотите извлечь несколько столбцов с помощью get_repeated_subsegments
, вам нужно изменить get_repeated_subsegments
, чтобы не выполнять explode
в нем, а когда вы выполняете выбор. Затем вы можете добавить несколько столбцов, используя arrays_zip
для массивов, полученных путем применения get_repeated_subsegments
, следующим образом:
import org.apache.spark.sql.functions.{arrays_zip, col, explode, filter, transform}
import org.apache.spark.sql.Column
def get_repeated_subsegments(segmentId: String, index: Int): Column = {
transform(
filter(col("transaction.segments"), c => c.getField("segment_id") === segmentId)
.getField("segment_fields"),
c => c.getItem(index)
)
}
df.select(
get_single_subsegment("CustomerInformation", 0).as("customer_name"),
explode(
arrays_zip(
get_repeated_subsegments("ItemPurchased", 0).as("item_id"),
get_repeated_subsegments("ItemPurchased", 1).as("item_name")
)
).alias("items")
)
.select("customer_name", "items.*")
.show(false)
// +----------------+----------------+------------------+
// |customer_name |item_id |item_name |
// +----------------+----------------+------------------+
// |SomeCustomerName|SomeItemID |SomeItemName |
// |SomeCustomerName|AnotherItemID |AnotherItemName |
// |SomeCustomerName|YetAnotherItemID|YetAnotherItemName|
// +----------------+----------------+------------------+
Обновлен ответ с решением для извлечения двух столбцов вместо одного с помощью get_repeated_subsegments
Спасибо еще раз! Можете ли вы указать мне документацию о функции arrays_zip где-нибудь в Интернете? Я не могу найти его на spark.apache.org/docs/latest/api/scala/org/apache/spark/…, и хотя я нахожу некоторые документы для pyspark, я не нахожу ничего конкретно для Scala.
Вот документация для arrays_zip
Это превосходно, большое спасибо! Один дополнительный вопрос: что, если я хочу извлечь как идентификатор элемента, так и его имя, и получить вывод с тремя столбцами (customer_name, item_id, item_name)? Я дважды пытался вызвать get_repeated_subsegments, но Scala это не понравилось!